I’ve reviewed my config and don’t see anything that differs from docs. The strange thing is that I can increase the counter multiple times (it’s a replicated_counter_map) and there’s no errors, but when I try to retrieve it that’s when it fails with a ClassCastException.
Here’s some debug logging done of the ops that took place, then the error thrown by the RE itself and the controller action that was trying to query it:
[info] 22:12:51.285 [kalix-akka.actor.default-dispatcher-10] DEBUG com.hiive.post.domain.LikePost - liking post bf7ef948-4042-42b1-b863-d29439821d15 by 2 likes
[info] 22:12:51.285 [kalix-akka.actor.default-dispatcher-10] DEBUG com.hiive.post.domain.LikePost - Saving like LikePostState(,aad6e03a-c3cd-4c3d-85af-fbc5e1d7b25c,,2,Some(Timestamp(1669183971,0,UnknownFieldSet(Map()))),Map(1669183971 -> 2),UnknownFieldSet(Map()))
[info] 22:12:52.018 [kalix-akka.actor.default-dispatcher-6] DEBUG com.hiive.post.domain.PostStats - Increasing likes from: kalix.scalasdk.replicatedentity.ReplicatedCounterMap@4cc95b89
[info] 22:12:52.018 [kalix-akka.actor.default-dispatcher-6] DEBUG com.hiive.post.domain.PostStats - Increasing likes to: kalix.scalasdk.replicatedentity.ReplicatedCounterMap@2e38287c
[info] 22:13:46.813 [kalix-akka.actor.default-dispatcher-10] ERROR kalix.scalasdk.impl.replicatedentity.JavaReplicatedEntityAdapter - Terminating entity [bf7ef948-4042-42b1-b863-d29439821d15] due to unexpected failure for command [GetStats]
[info] kalix.javasdk.impl.EntityExceptions$EntityException: Unexpected failure: java.lang.ClassCastException: class kalix.javasdk.impl.replicatedentity.ReplicatedCounterMapImpl cannot be cast to class kalix.scalasdk.replicatedentity.ReplicatedCounterMap (kalix.javasdk.impl.replicatedentity.ReplicatedCounterMapImpl and kalix.scalasdk.replicatedentity.ReplicatedCounterMap are in unnamed module of loader 'app')
[info] at kalix.javasdk.impl.EntityExceptions$EntityException$.apply(EntityExceptions.scala:46)
[info] at kalix.javasdk.impl.replicatedentity.ReplicatedEntitiesImpl$EntityRunner.liftedTree1$1(ReplicatedEntitiesImpl.scala:179)
[info] at kalix.javasdk.impl.replicatedentity.ReplicatedEntitiesImpl$EntityRunner.handleCommand(ReplicatedEntitiesImpl.scala:175)
[info] at kalix.javasdk.impl.replicatedentity.ReplicatedEntitiesImpl.$anonfun$runEntity$3(ReplicatedEntitiesImpl.scala:120)
[info] at akka.stream.impl.fusing.StatefulMapConcat$$anon$50.onPush(Ops.scala:2326)
[info] at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
[info] at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:496)
[info] at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
[info] at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
[info] at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
[info] at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
[info] at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
[info] at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:787)
[info] at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
[info] at akka.actor.Actor.aroundReceive(Actor.scala:537)
[info] at akka.actor.Actor.aroundReceive$(Actor.scala:535)
[info] at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:547)
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:231)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
[info] at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
[info] at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
[info] at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
[info] at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
[info] at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
[info] Caused by: java.lang.ClassCastException: class kalix.javasdk.impl.replicatedentity.ReplicatedCounterMapImpl cannot be cast to class kalix.scalasdk.replicatedentity.ReplicatedCounterMap (kalix.javasdk.impl.replicatedentity.ReplicatedCounterMapImpl and kalix.scalasdk.replicatedentity.ReplicatedCounterMap are in unnamed module of loader 'app')
[info] at com.hiive.post.domain.PostStatsRouter.handleCommand(PostStatsRouter.scala:18)
[info] at kalix.scalasdk.impl.replicatedentity.JavaReplicatedEntityRouterAdapter.handleCommand(ReplicatedEntityAdapters.scala:103)
[info] at kalix.javasdk.impl.replicatedentity.ReplicatedEntityRouter._internalHandleCommand(ReplicatedEntityRouter.scala:95)
[info] at kalix.javasdk.impl.replicatedentity.ReplicatedEntitiesImpl$EntityRunner.liftedTree1$1(ReplicatedEntitiesImpl.scala:176)
[info] ... 25 common frames omitted
[info] 22:13:46.854 [kalix-akka.actor.default-dispatcher-7] ERROR kalix.javasdk.impl.action.ActionsImpl - Failure during handling of command com.hiive.post.control.PostControllerService.GetPost
[info] io.grpc.StatusRuntimeException: UNKNOWN: Entity terminating
[info] at io.grpc.Status.asRuntimeException(Status.java:535)
[info] at akka.grpc.internal.UnaryCallAdapter.onClose(UnaryCallAdapter.scala:40)
[info] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
[info] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
[info] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
[info] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
[info] at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
[info] at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:833)
This is the relevant bits of the RE implementation:
class PostStats(context: ReplicatedEntityContext) extends AbstractPostStats {
private val log = LoggerFactory.getLogger("com.hiive.post.domain.PostStats")
final val StatTypeCodeLike = "L"
def increaseLikes(curData: ReplicatedCounterMap[String], increaseStatRequest:
api.IncreaseStatRequest): ReplicatedEntity.Effect[Empty] = {
log.debug("Increasing likes from: " + curData)
val updatedData = curData.increment(StatTypeCodeLike, increaseStatRequest.count)
log.debug("Increasing likes to: " + updatedData)
effects.update(updatedData).thenReply(Empty.defaultInstance)
}
def getStats(curData: ReplicatedCounterMap[String], getStatsRequest: api.GetStatsRequest): ReplicatedEntity.Effect[api.PostStats] = {
val curStatTypeCodeLike = curData.get(StatTypeCodeLike).getOrElse(0L).toInt
log.debug("curStatTypeCodeLike: " + curStatTypeCodeLike)
val curStatTypeCodeView = curData.get(StatTypeCodeView).getOrElse(0L)
log.debug("curStatTypeCodeView: " + curStatTypeCodeView)
val curStatTypeCodeReply = curData.get(StatTypeCodeReply).getOrElse(0L).toInt
log.debug("curStatTypeCodeReply: " + curStatTypeCodeReply)
val curStatTypeCodeRepost = curData.get(StatTypeCodeRepost).getOrElse(0L).toInt
log.debug("curStatTypeCodeRepost: " + curStatTypeCodeRepost)
val curStatTypeCodeSave = curData.get(StatTypeCodeSave).getOrElse(0L).toInt
log.debug("curStatTypeCodeSave: " + curStatTypeCodeSave)
val curStatTypeCodeBoost = curData.get(StatTypeCodeBoost).getOrElse(0L).toInt
log.debug("curStatTypeCodeBoost: " + curStatTypeCodeBoost)
val stats = api.PostStats(
getStatsRequest.postId,
curData.get(StatTypeCodeLike).getOrElse(0L).toInt,
curData.get(StatTypeCodeView).getOrElse(0L),
curData.get(StatTypeCodeReply).getOrElse(0L).toInt,
curData.get(StatTypeCodeRepost).getOrElse(0L).toInt,
curData.get(StatTypeCodeSave).getOrElse(0L).toInt,
curData.get(StatTypeCodeBoost).getOrElse(0L).toInt,
)
effects.reply(stats)
}
The full RE definition:
syntax = "proto3";
package com.hiive.post.api;
import "google/protobuf/empty.proto";
import "kalix/annotations.proto";
service PostStatsService {
option (kalix.codegen) = {
replicated_entity: {
name: "com.hiive.post.domain.PostStats"
entity_type: "poststatss"
replicated_counter_map: {
key: "string"
}
}
};
rpc IncreaseLikes(IncreaseStatRequest) returns (google.protobuf.Empty);
rpc DecreaseLikes(DecreaseStatRequest) returns (google.protobuf.Empty);
rpc IncreaseViews(IncreaseStatRequest) returns (google.protobuf.Empty);
rpc IncreaseReplies(IncreaseStatRequest) returns (google.protobuf.Empty);
rpc IncreaseReposts(IncreaseStatRequest) returns (google.protobuf.Empty);
rpc IncreaseSaves(IncreaseStatRequest) returns (google.protobuf.Empty);
rpc IncreaseBoosts(IncreaseStatRequest) returns (google.protobuf.Empty);
rpc GetStats(GetStatsRequest) returns (PostStats);
}
message PostStats {
string postId = 1;
sint32 likeCount = 2;
int64 viewCount = 3;
int32 replyCount = 4;
int32 repostCount = 5;
int32 saveCount = 6;
int32 boostCount = 7;
}
message GetStatsRequest {
string postId = 1 [(kalix.field).entity_key = true];
}
message IncreaseStatRequest {
string postId = 1 [(kalix.field).entity_key = true];
sint32 count = 2;
}
message DecreaseStatRequest {
string postId = 1 [(kalix.field).entity_key = true];
sint32 count = 2;
}
And finally the 2 relevant controller methods:
override def likePost(likePostRequest: LikePostRequest): Action.Effect[Empty] = {
val userId = actionContext.metadata.jwtClaims.subject.get
val coinUsedFut = components.userCoinjar.useCoin(UseCoinRequest(userId)).execute()
val effect = coinUsedFut.map { _ =>
log.debug("Coin was used, increasing likes stats and creating like")
val increaseLikesRequest = IncreaseStatRequest(likePostRequest.postId, likePostRequest.likeCount)
for {
addToStatsCall <- components.postStats.increaseLikes(increaseLikesRequest).execute()
createLikeCall <- components.likePost.like(convertToAPI(likePostRequest, userId)).execute()
} yield Empty()
effects.reply(Empty())
}.recover { error =>
effects.error(error.getMessage)
}
effects.asyncEffect(effect)
}
override def getPost(getPostRequest: GetPostRequest): Action.Effect[Post] = {
val apiPostFut = components.post.get(GetPostRequest(getPostRequest.id)).execute()
val apiPostStatsFut = components.postStats.getStats(GetStatsRequest(getPostRequest.id)).execute()
val apiPost = Await.result(apiPostFut, Duration.create("2 sec"))
val apiPostStats = Await.result(apiPostStatsFut, Duration.create("2 sec"))
val apiPostUserFut = components.user.get(GetUserRequest(apiPost.userId)).execute()
val apiPostRepliesFuts = apiPost.acceptedRepliesPostIds.map{ replyPostId =>
components.post.get(GetPostRequest(replyPostId)).execute()
}
val apiPostReplies = Await.result(Future.sequence(apiPostRepliesFuts), Duration.create("2 sec"))
val hydratedPostFuts = apiPostReplies.map { replyPost =>
val replyUserFut = components.user.get(GetUserRequest(replyPost.userId)).execute()
components.postStats.getStats(GetStatsRequest(replyPost.id)).execute().map { replyStats =>
val replyUser = Await.result(replyUserFut, Duration.create("2 sec"))
convertFromAPI(replyPost, replyUser.username, replyStats, Seq.empty[Post])
}
}
val hydratedReplies = Await.result(Future.sequence(hydratedPostFuts), Duration.create("2 sec"))
val user = Await.result(apiPostUserFut, Duration.create("2 sec"))
effects.reply(convertFromAPI(apiPost, user.username, apiPostStats, hydratedReplies))
}