Weird error in view: subject id attribute missing

Getting the following error (locally) about a subject id attribute missing but I don’t see any reference to that in the docs. What am I missing?

The full error:

[info] 10:20:11.542 [kalix-akka.actor.default-dispatcher-3] ERROR kalix.javasdk.impl.DiscoveryImpl - Error reported from Kalix system: KLX-00121 An incoming event for the view [UserFollowByFollower] was missing the subject id attribute `ce-subject`, view updates are stalled.
[info] Each update passed to a view must have a subject id, used as a primary key in the view, each unique subject-id can only have one entry in the view, if the update comes from a message broker the subject id is defined with the cloud event attribute [ce-subject]. 
[info] At com/hiive/user/view/userfollow_byfollower_view.proto:10:1:
[info] service UserFollowByFollower {
[info]   option (kalix.codegen) = {
[info]     view: {}
[info]   };
[info]   rpc UpdateUserFollow(domain.UserFollowState) returns (api.UserFollow) {

Here’s the view definition:

syntax = "proto3";

package com.hiive.user.view;

import "google/api/annotations.proto";
import "com/hiive/user/domain/userfollow_domain.proto";
import "com/hiive/user/api/userfollow_api.proto";
import "kalix/annotations.proto";

service UserFollowByFollower {
  option (kalix.codegen) = {
    view: {}
  };

  rpc UpdateUserFollow(domain.UserFollowState) returns (api.UserFollow) {
    option (kalix.method).eventing.in = {
      value_entity: "userfollows"
    };
    option (kalix.method).view.update = {
      table: "userfollows_byfollower_v1"
    };
    option (google.api.http) = {
      post: "/userfollow/byfollower"
      body: "*"
    };
    option (kalix.method).jwt = {
      validate: BEARER_TOKEN
    };
  }

  rpc GetUserFollows(UserFollowByFollowerRequest) returns (api.UserFollow) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM userfollows_byfollower_v1 WHERE followerId = :followerId AND active = true ORDER BY followDatetime DESC"
    };
  }

}

message UserFollowByFollowerRequest {
  string followerId = 1;
}

Here’s the related entity domain definition:

syntax = "proto3";

package com.hiive.user.domain;

import "google/protobuf/timestamp.proto";

message UserFollowState {

  string id = 1;
  string followerId = 2;
  string followedId = 3;
  google.protobuf.Timestamp followDatetime = 4;
  google.protobuf.Timestamp unfollowDatetime = 5;
  bool active = 6;

}

Here’s the related entity api definition:

syntax = "proto3";

package com.hiive.user.api;

import "google/protobuf/empty.proto";
import "kalix/annotations.proto";
import "google/protobuf/timestamp.proto";

service UserFollowService {
  option (kalix.codegen) = {
    value_entity: {
      name: "com.hiive.user.domain.UserFollow"
      entity_type: "userfollows"
      state: "com.hiive.user.domain.UserFollowState"
    }
  };

  rpc Follow(FollowUserRequest) returns (google.protobuf.Empty)  {
    option (kalix.method).entity.key_generator = VERSION_4_UUID;
  };

  rpc Get(GetUserFollowRequest) returns (UserFollow) {}

  rpc Unfollow(UnfollowUserRequest) returns (google.protobuf.Empty) {}

}

message UserFollow {
  string id = 1 [(kalix.field).entity_key = true];
  string followerId = 2;
  string followedId = 3;
  google.protobuf.Timestamp followDatetime = 4;
  google.protobuf.Timestamp unfollowDatetime = 5;
  bool active = 6;
}

message FollowUserRequest {
  string id = 1;
  string followerId = 2;
  string followedId = 3;
}

message GetUserFollowRequest {
  string id = 1 [(kalix.field).entity_key = true];
}

message UnfollowUserRequest {
  string id = 1 [(kalix.field).entity_key = true];
}

That is strange and is probably a Kalix bug, the error message should only show if you feed a message broker to a view and forget to mark message in the topic with a subject-id.

I’ll list it in our internal issue tracker so it will get investigated.

@maristi thanks for reporting that. One thing I’ve noticed is that you should use transform_updates: true option when your View model is different than VE model (checkout the example for a view based on ES entity).

Even though, I was not able to reproduce your problem. Can you share your VE entity implementation? And some hints how to reproduce this exception.

Ahh, can’t believe I missed transform_updates Andrzej, but yeah even after fixing that issue persists. Support asked me via case to updgrade to 1.10, I did that too.

One interesting bit is that unlike the run locally instructions, I can’t seem to stop or delete the proxy (after ctrl-c stopping it), so just wondering if there could be bad data in there even if the view def and implementation are correct now:

% docker stop kalix-proxy
Error response from daemon: No such container: kalix-proxy
% docker rm kalix-proxy  
Error: No such container: kalix-proxy

I’m posting the VE implementation you requested + the view implementation now that I’ve tagged it for transform_updates:

class UserFollow(context: ValueEntityContext) extends AbstractUserFollow {

  override def emptyState: UserFollowState = UserFollowState.defaultInstance

  override def follow(curState: UserFollowState, followRequest: api.FollowUserRequest): ValueEntity.Effect[Empty] = {
    val curTime = DateUtil.getCurrentPBTimestampOption()
    val newState = curState.copy(
      id = context.entityId,
      followerId = followRequest.followerId,
      followedId = followRequest.followedId,
      followDatetime = curTime,
      active = true
    )
    effects.updateState(newState).thenReply(Empty.defaultInstance)
  }

  override def get(curState: UserFollowState, getUserFollowRequest: api.GetUserFollowRequest): ValueEntity.Effect[api.UserFollow] =
    effects.reply(convertToApi(curState))

  override def unfollow(curState: UserFollowState, unfollowUserRequest: api.UnfollowUserRequest): ValueEntity.Effect[Empty] = {
    if (curState.active) {
      val curTime = DateUtil.getCurrentPBTimestampOption()
      val newState = curState.copy(
        unfollowDatetime = curTime,
        active = false
      )
      effects.updateState(newState).thenReply(Empty.defaultInstance)
    } else {
      effects.error("Already unfollowed")
    }
  }


  private def convertToApi(followState: UserFollowState): api.UserFollow =
    api.UserFollow(
      followState.id,
      followState.followerId,
      followState.followedId,
      followState.followDatetime,
      followState.unfollowDatetime,
      followState.active
    )

}
class UserFollowByFollowerView(context: ViewContext) extends AbstractUserFollowByFollowerView {

  override def emptyState: UserFollow = UserFollow.defaultInstance

  override def updateUserFollow(state: UserFollow, userFollowState: UserFollowState): UpdateEffect[UserFollow] = {

    val summary = state.copy(
      id = userFollowState.id,
      followerId = userFollowState.followerId,
      followedId = userFollowState.followedId,
      followDatetime = userFollowState.followDatetime,
      unfollowDatetime = userFollowState.unfollowDatetime,
      active = userFollowState.active
    )
    effects.updateState(summary)
  }
}

Thanks!

OMG i’m an idiot! It’s fixed now…

As you can see even in my initial post, I had added the http and jwt annotations to the view’s update method, not the get method. Moved them over and it started working fine.

Apologies for wasting everyone’s time, though perhaps would be good to add a little bit of validation there so this is caught on compilation.

Yes, with the next release it won’t be possible. I’ve just added extra validation for that.

1 Like