Stalled View updates due to missing `ce-subject` attribute

Hello!

Ignoring the details, I have an entity A that emits events. Then I have an action B that writes those events to a topic. Then I have a view C subscribing to that topic.

service B {
  option (kalix.codegen).action = {};

  rpc OnFoo(domain.Foo) returns (journal.Foo) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "foo"
    };
    option (kalix.method).eventing.out = {
      topic: "foo-journal"
    };
  }

The issue is that view C does not update.

2023-05-03 09:11:37,356 ERROR kalix.javasdk.impl.DiscoveryImpl - Error reported from Kalix system: KLX-00121 An incoming event for the view [View] was missing the subject id attribute `ce-subject`, view updates are stalled.

Each update passed to a view must have a subject id, used as a primary key in the view, each unique subject-id can only have one entry in the view, if the update comes from a message broker the subject id is defined with the cloud event attribute [ce-subject]. 

To try to fix this, in B I did:

effects.reply(event, actionContext.metadata)

and

effects.reply(event, Metadata.empty.set(MetadataImpl.CeSubject, actionContext.eventSubject.get))

However, none of the two work and I still get a stalled View C.

How can I pass the subject id from an entity to a topic in B?

Hi @glop.postbox.0u

let me try to reproduce it and I will get back to you.

Ok, everything works correctly on my machine. Although I was testing it with in Java (proto) sdk. I could check it again with Scala, but before that, are you sure that you have cleaned the topic with the old data (without CeSubject)?

You’re right, sorry I missed the “clean the topic” part.

Curious what’s the reasoning behind this decision though. In the Publishing Entity events to a Topic the code used is

override def increase(valueIncreased: ValueIncreased): Action.Effect[Increased] = {
    effects.reply(Increased(valueIncreased.value)) 
}

And this code does not forward the event’s subject by default. Why would that be?

Manually doing it seems a bit unneccesary.

  val replyMetadata = {
    actionContext.eventSubject.fold(Metadata.empty) { subject =>
      Metadata.empty.set(MetadataImpl.CeSubject, subject)
    }
  }
  ...
  effects.reply(event, replyMetadata)
}

I understand your point, especially in the context of views, where “ce-subject” is mandatory. The thing is that publish to a topic is a more general-purpose solution. Most likely it will be used to share data with the outside world, where you might not want to expose the internal subject value, or you might want to map it to sth else. Also, an action method that publishes to a topic can be used as an endpoint, where you could publish individual messages (in this case metadata is empty, but can be populated with request headers). All in all, it’s a matter of a balance between things that work out of the box and things that are flexible to change.

BTW, with service to service eventing you don’t have to pass metadata manually, we will do this for you, since everything “stays” in the Kalix ecosystem.

1 Like

Understood, thanks @aludwiko