Weird error when retrieving replicated entity

I’ve reviewed my config and don’t see anything that differs from docs. The strange thing is that I can increase the counter multiple times (it’s a replicated_counter_map) and there’s no errors, but when I try to retrieve it that’s when it fails with a ClassCastException.

Here’s some debug logging done of the ops that took place, then the error thrown by the RE itself and the controller action that was trying to query it:

[info] 22:12:51.285 [kalix-akka.actor.default-dispatcher-10] DEBUG com.hiive.post.domain.LikePost - liking post bf7ef948-4042-42b1-b863-d29439821d15 by 2 likes
[info] 22:12:51.285 [kalix-akka.actor.default-dispatcher-10] DEBUG com.hiive.post.domain.LikePost - Saving like LikePostState(,aad6e03a-c3cd-4c3d-85af-fbc5e1d7b25c,,2,Some(Timestamp(1669183971,0,UnknownFieldSet(Map()))),Map(1669183971 -> 2),UnknownFieldSet(Map()))


[info] 22:12:52.018 [kalix-akka.actor.default-dispatcher-6] DEBUG com.hiive.post.domain.PostStats - Increasing likes from: kalix.scalasdk.replicatedentity.ReplicatedCounterMap@4cc95b89
[info] 22:12:52.018 [kalix-akka.actor.default-dispatcher-6] DEBUG com.hiive.post.domain.PostStats - Increasing likes to: kalix.scalasdk.replicatedentity.ReplicatedCounterMap@2e38287c


[info] 22:13:46.813 [kalix-akka.actor.default-dispatcher-10] ERROR kalix.scalasdk.impl.replicatedentity.JavaReplicatedEntityAdapter - Terminating entity [bf7ef948-4042-42b1-b863-d29439821d15] due to unexpected failure for command [GetStats]
[info] kalix.javasdk.impl.EntityExceptions$EntityException: Unexpected failure: java.lang.ClassCastException: class kalix.javasdk.impl.replicatedentity.ReplicatedCounterMapImpl cannot be cast to class kalix.scalasdk.replicatedentity.ReplicatedCounterMap (kalix.javasdk.impl.replicatedentity.ReplicatedCounterMapImpl and kalix.scalasdk.replicatedentity.ReplicatedCounterMap are in unnamed module of loader 'app')
[info]  at kalix.javasdk.impl.EntityExceptions$EntityException$.apply(EntityExceptions.scala:46)
[info]  at kalix.javasdk.impl.replicatedentity.ReplicatedEntitiesImpl$EntityRunner.liftedTree1$1(ReplicatedEntitiesImpl.scala:179)
[info]  at kalix.javasdk.impl.replicatedentity.ReplicatedEntitiesImpl$EntityRunner.handleCommand(ReplicatedEntitiesImpl.scala:175)
[info]  at kalix.javasdk.impl.replicatedentity.ReplicatedEntitiesImpl.$anonfun$runEntity$3(ReplicatedEntitiesImpl.scala:120)
[info]  at akka.stream.impl.fusing.StatefulMapConcat$$anon$50.onPush(Ops.scala:2326)
[info]  at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
[info]  at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:496)
[info]  at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
[info]  at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
[info]  at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
[info]  at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:787)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
[info]  at akka.actor.Actor.aroundReceive(Actor.scala:537)
[info]  at akka.actor.Actor.aroundReceive$(Actor.scala:535)
[info]  at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
[info]  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
[info]  at akka.actor.ActorCell.invoke(ActorCell.scala:547)
[info]  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
[info]  at akka.dispatch.Mailbox.run(Mailbox.scala:231)
[info]  at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
[info]  at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
[info]  at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
[info]  at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
[info]  at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
[info]  at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
[info] Caused by: java.lang.ClassCastException: class kalix.javasdk.impl.replicatedentity.ReplicatedCounterMapImpl cannot be cast to class kalix.scalasdk.replicatedentity.ReplicatedCounterMap (kalix.javasdk.impl.replicatedentity.ReplicatedCounterMapImpl and kalix.scalasdk.replicatedentity.ReplicatedCounterMap are in unnamed module of loader 'app')
[info]  at com.hiive.post.domain.PostStatsRouter.handleCommand(PostStatsRouter.scala:18)
[info]  at kalix.scalasdk.impl.replicatedentity.JavaReplicatedEntityRouterAdapter.handleCommand(ReplicatedEntityAdapters.scala:103)
[info]  at kalix.javasdk.impl.replicatedentity.ReplicatedEntityRouter._internalHandleCommand(ReplicatedEntityRouter.scala:95)
[info]  at kalix.javasdk.impl.replicatedentity.ReplicatedEntitiesImpl$EntityRunner.liftedTree1$1(ReplicatedEntitiesImpl.scala:176)
[info]  ... 25 common frames omitted


[info] 22:13:46.854 [kalix-akka.actor.default-dispatcher-7] ERROR kalix.javasdk.impl.action.ActionsImpl - Failure during handling of command com.hiive.post.control.PostControllerService.GetPost
[info] io.grpc.StatusRuntimeException: UNKNOWN: Entity terminating
[info]  at io.grpc.Status.asRuntimeException(Status.java:535)
[info]  at akka.grpc.internal.UnaryCallAdapter.onClose(UnaryCallAdapter.scala:40)
[info]  at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
[info]  at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
[info]  at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
[info]  at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
[info]  at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
[info]  at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
[info]  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]  at java.base/java.lang.Thread.run(Thread.java:833)

This is the relevant bits of the RE implementation:

class PostStats(context: ReplicatedEntityContext) extends AbstractPostStats {

  private val log = LoggerFactory.getLogger("com.hiive.post.domain.PostStats")

  final val StatTypeCodeLike = "L"

  def increaseLikes(curData: ReplicatedCounterMap[String], increaseStatRequest: 
  api.IncreaseStatRequest): ReplicatedEntity.Effect[Empty] = {
    log.debug("Increasing likes from: " + curData)
    val updatedData = curData.increment(StatTypeCodeLike, increaseStatRequest.count)
    log.debug("Increasing likes to: " + updatedData)
    effects.update(updatedData).thenReply(Empty.defaultInstance)
  }

  def getStats(curData: ReplicatedCounterMap[String], getStatsRequest: api.GetStatsRequest): ReplicatedEntity.Effect[api.PostStats] = {

    val curStatTypeCodeLike = curData.get(StatTypeCodeLike).getOrElse(0L).toInt
    log.debug("curStatTypeCodeLike: " + curStatTypeCodeLike)
    val curStatTypeCodeView = curData.get(StatTypeCodeView).getOrElse(0L)
    log.debug("curStatTypeCodeView: " + curStatTypeCodeView)
    val curStatTypeCodeReply = curData.get(StatTypeCodeReply).getOrElse(0L).toInt
    log.debug("curStatTypeCodeReply: " + curStatTypeCodeReply)
    val curStatTypeCodeRepost = curData.get(StatTypeCodeRepost).getOrElse(0L).toInt
    log.debug("curStatTypeCodeRepost: " + curStatTypeCodeRepost)
    val curStatTypeCodeSave = curData.get(StatTypeCodeSave).getOrElse(0L).toInt
    log.debug("curStatTypeCodeSave: " + curStatTypeCodeSave)
    val curStatTypeCodeBoost = curData.get(StatTypeCodeBoost).getOrElse(0L).toInt
    log.debug("curStatTypeCodeBoost: " + curStatTypeCodeBoost)

    val stats = api.PostStats(
      getStatsRequest.postId,
      curData.get(StatTypeCodeLike).getOrElse(0L).toInt,
      curData.get(StatTypeCodeView).getOrElse(0L),
      curData.get(StatTypeCodeReply).getOrElse(0L).toInt,
      curData.get(StatTypeCodeRepost).getOrElse(0L).toInt,
      curData.get(StatTypeCodeSave).getOrElse(0L).toInt,
      curData.get(StatTypeCodeBoost).getOrElse(0L).toInt,
    )
    effects.reply(stats)
  }

The full RE definition:

syntax = "proto3";

package com.hiive.post.api;

import "google/protobuf/empty.proto";
import "kalix/annotations.proto";

service PostStatsService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.hiive.post.domain.PostStats"
      entity_type: "poststatss"
      replicated_counter_map: {
        key: "string"
      }
    }
  };

  rpc IncreaseLikes(IncreaseStatRequest) returns (google.protobuf.Empty);
  rpc DecreaseLikes(DecreaseStatRequest) returns (google.protobuf.Empty);
  rpc IncreaseViews(IncreaseStatRequest) returns (google.protobuf.Empty);
  rpc IncreaseReplies(IncreaseStatRequest) returns (google.protobuf.Empty);
  rpc IncreaseReposts(IncreaseStatRequest) returns (google.protobuf.Empty);
  rpc IncreaseSaves(IncreaseStatRequest) returns (google.protobuf.Empty);
  rpc IncreaseBoosts(IncreaseStatRequest) returns (google.protobuf.Empty);
  rpc GetStats(GetStatsRequest) returns (PostStats);
}

message PostStats {
  string postId = 1;
  sint32 likeCount = 2;
  int64 viewCount = 3;
  int32 replyCount = 4;
  int32 repostCount = 5;
  int32 saveCount = 6;
  int32 boostCount = 7;
}

message GetStatsRequest {
  string postId = 1 [(kalix.field).entity_key = true];
}

message IncreaseStatRequest {
  string postId = 1 [(kalix.field).entity_key = true];
  sint32 count = 2;
}

message DecreaseStatRequest {
  string postId = 1 [(kalix.field).entity_key = true];
  sint32 count = 2;
}

And finally the 2 relevant controller methods:

  override def likePost(likePostRequest: LikePostRequest): Action.Effect[Empty] = {
    val userId = actionContext.metadata.jwtClaims.subject.get
    val coinUsedFut = components.userCoinjar.useCoin(UseCoinRequest(userId)).execute()
    val effect = coinUsedFut.map { _ =>
      log.debug("Coin was used, increasing likes stats and creating like")
      val increaseLikesRequest = IncreaseStatRequest(likePostRequest.postId, likePostRequest.likeCount)
      for {
        addToStatsCall <- components.postStats.increaseLikes(increaseLikesRequest).execute()
        createLikeCall <- components.likePost.like(convertToAPI(likePostRequest, userId)).execute()
      } yield Empty()

      effects.reply(Empty())
    }.recover { error =>
      effects.error(error.getMessage)
    }
    effects.asyncEffect(effect)
  }


  override def getPost(getPostRequest: GetPostRequest): Action.Effect[Post] = {

    val apiPostFut = components.post.get(GetPostRequest(getPostRequest.id)).execute()
    val apiPostStatsFut = components.postStats.getStats(GetStatsRequest(getPostRequest.id)).execute()
    val apiPost = Await.result(apiPostFut, Duration.create("2 sec"))
    val apiPostStats = Await.result(apiPostStatsFut, Duration.create("2 sec"))

    val apiPostUserFut = components.user.get(GetUserRequest(apiPost.userId)).execute()

    val apiPostRepliesFuts = apiPost.acceptedRepliesPostIds.map{ replyPostId =>
      components.post.get(GetPostRequest(replyPostId)).execute()
    }
    val apiPostReplies = Await.result(Future.sequence(apiPostRepliesFuts), Duration.create("2 sec"))

    val hydratedPostFuts = apiPostReplies.map { replyPost =>
      val replyUserFut = components.user.get(GetUserRequest(replyPost.userId)).execute()
      components.postStats.getStats(GetStatsRequest(replyPost.id)).execute().map { replyStats =>
        val replyUser = Await.result(replyUserFut, Duration.create("2 sec"))
        convertFromAPI(replyPost, replyUser.username, replyStats, Seq.empty[Post])
      }
    }
    val hydratedReplies = Await.result(Future.sequence(hydratedPostFuts), Duration.create("2 sec"))
    val user = Await.result(apiPostUserFut, Duration.create("2 sec"))

    effects.reply(convertFromAPI(apiPost, user.username, apiPostStats, hydratedReplies))
  }

Hi @maristi,

I’ve tried to reproduced it and strange enough I could only reproduce it once. After restarting the service the problem went away.

I run it on my local machine and started the service with sbt run.

I suspect that it has something to do with class loading, but that won’t explain why the Increase command succeeded and the Get failed. That doesn’t make sense to me.

Could you tell how did you produce that error? Was that on your local machine as well?

Cheers,

Renato

Thanks Renato. Yeah was happening locally but deployed and also happens there (see below).

It’s very inconsistent, I haven’t been able to discern the pattern, and have tried about 20 times. Sometimes I do the increment and then the get a couple times and it’s fine, then I’ll get one more time and it fails.

Support replied that they think it’s a bug. I’m hoping they figure it out because I’m at a loss here.

We are investigating it.

I created a ticket on GitHub in case you want to follow it here is the link:

Thanks very much Renato. I’ll keep an eye on the issue. Glad that you’ve been able to reproduce it reliably now. -m