Protobuf To Kafka Topic

Can you provide an example to publish protobuf to a Confluent Kafka topic please including customisation of the Key.

The doc snippets only show consumption from cloudevents.

There is nothing special to do to publish protobuf, a reply effect from an action method annotated with:

(kalix.method).eventing.out = 
 topic: "my-topic"
}

will serialize the returned protobuf messages to binary and add cloud event metadata identifying the message type to the event (content-type will be application/protobuf and ce-type will contain the fully qualified protobuf message name).

Kalix will use the cloud event attribute subject of the message as Kafka message key if present. If the action is subscribing to the event stream out of an entity, the entity key of the entity is passed along as subject.

The same goes if the event comes from another Kafka topic into the action method, if it already has a cloud event subject attribute that is passed along to the emitted event when publishing the result.

You can also explicitly set the subject as a metadata entry on the reply/effect with the name subject-id with the metadata/cloud event APIs of the SDKs. I see we do not have any samples of that yet.

In the Java/Scala SDKs it would be something like:

effects().reply(myReply, Metadata.EMPTY.set("ce-subject", myReply.id));

In the Typescript/Javascript SDK:

return replies.message(myReply, (new Metadata()).set("ce-subject", myReply.id));

Thanks here’s my implementation

  public Effect<TopicsActionApi.MyMessage> onUpdateState(MyDomain.MyState myState) {
    String key = myState.getId();
    var metadata = actionContext().metadata().set("ce-subject", key);
    TopicsActionApi.MyStateUpdated myStateUpdated = TopicsActionApi.MyStateUpdated.newBuilder()
            .setId(myState.getId())
            .setValue(myState.getValue())
            .build();
    return effects().reply(myStateUpdated, metadata);
  }

Note that it is not correct in general to pass all incoming metadata with the response (from actionContext).metadata()), it includes message attributes like cloud event id, content-type and ce-type for the incoming message which should not be set the same for your outgoing message. Better to only define the ce-subject field like I suggested.

Ok thanks for clarifying. I updated to reflect only the ce-subject.

I’ve noticed that the serialization does not seem to be working correctly. Are there additional configurations that are required? In either Kalix or Confluent?

Here is a sample of the message on Confluent, note the special characters within the message.

26dabec3-4733-4a39-96a7-1d2ccf3768aee2022-07-07T20:20:46.671651Ze2022-07-07T20:20:46.671651Z"STATUS_UNKNOWN*WBC2BANKER

Message example is showing a text string and, in case of protobuf message, we would expect some protobuf payload.
Are you sure this message originated from Kalix?

Could you also share your action protobuf?

The special character is WBC2BANKER?

Can you share the Kafka message headers (most interesting is content-type and ce-type)?

syntax = “proto3”;

package com.example.deal.action;

import “kalix/annotations.proto”;

import “com/example/deal/domain/deal_domain.proto”;

import “com/example/deal/api/deal_api.proto”;

import “google/protobuf/any.proto”;

import “google/protobuf/empty.proto”;

option java_outer_classname = “TopicsActionApi”;

message DealStateUpdated {

string dealId = 1;

string createdTimestamp = 2;

string modifiedTimestamp = 3;

string lifecycleStatus = 4;

string brand = 5;

string channel = 6;

}

service TopicsAction {

option (kalix.codegen) = {

action: {}

};

// Uncomment to change return type to Any for JSON

//rpc OnUpdateState(com.example.deal.domain.DealState) returns (google.protobuf.Any) {

rpc OnUpdateState(com.example.deal.domain.DealState) returns (com.example.deal.action.DealStateUpdated) {

option (kalix.method).eventing.in = {

value_entity: “deals”

};

option (kalix.method).eventing.out = {

topic: “deal”

};

}

}

[

{

“key”: “ce-source”,

“stringValue”: “OnUpdateState”

},

{

“key”: “ce-type”,

“stringValue”: “com.example.deal.action.DealStateUpdated”

},

{

“key”: “ce-time”,

“stringValue”: “2022-07-06T20:56:58.021461726Z”

},

{

“key”: “ce-specversion”,

“stringValue”: “1.0”

},

{

“key”: “ce-subject”,

“stringValue”: “49865d54-a5d8-45a6-aa8b-889d8d685a60”

},

{

“key”: “ce-id”,

“stringValue”: “2f6fd635-be15-4427-85a6-7bb240167068”

},

{

“key”: “ce-datacontenttype”,

“stringValue”: “application/protobuf”

}

]

Ok, that looks like correct metadata for a protobuf message, that means that the payload is protobuf encoded bytes, so looking at the payload in some UI that expects it to be a UTF-8 encoded string may not work, if that is what you are doing.

Do you also experience that the character is wrong if you decode the palyoad into a protobuf message and look at the field values of that message?

The UI is the Confluent UI showing the messages received on the Topic. If I try to process the message in Confluent I get deserialization error.

Also when I try to consume the Topic in Kalix I can see the messages being consumed in Confluent dashboard but not data is stored in the entity store.

When consuming a message from Kafka, Kalix will look at the cloud event content-type and ce-type to know how to deserialize, the actual deserialization happens in the user function/sdk, if that does not work you should see the error/exception logged from your deployed service. Do you see anything that gives a hint about problems in your logs?

The Kafka offset is only committed on successful reply after handling the event in your entity, so if there is such an error the topic should get stuck rather than move forward.

Here is the stream processing log output on Confluent. Looks like it is expecting particular encoding.

SELECT
encode(message->deserializationError->RECORDB64, ‘base64’, ‘utf8’) AS message,
message->deserializationError->cause AS deserializationErrorCause,
message->deserializationError->errorMessage AS deserializationErrorMessage,
message->recordProcessingError->cause as recordProcessingErrorCause,
message->recordProcessingError->record as recordProcessingErrorRecord,
message->recordProcessingError->errorMessage as recordProcessingErrorErrorMessage,
message->serializationError->cause as serializationErrorCause,
message->serializationError->record as serializationErrorRecord,
message->serializationError->errorMessage as serializationErrorErrorMessage,
message->productionError->errorMessage as productionErrorErrorMessage,
message->kafkaStreamsThreadError->cause as kafkaStreamsThreadErrorCause,
message->kafkaStreamsThreadError->threadName as kafkaStreamsThreadErrorThreadName,
message->kafkaStreamsThreadError->errorMessage as kafkaStreamsThreadErrorErrorMessage
FROM KSQL_PROCESSING_LOG

EMIT CHANGES;

With a protobuf encoded binary message I think you will need to specifically deserialize as protobuf using your message descriptors if you want to consume them on the Confluent side. I’m not sure about the exact steps though.

If the consumer is a Kalix component the deserialization should work as expected as long as the action/view annotated with eventing in from the topic accepts all the protobuf message types the topic may contain.

If you do not want binary protobuf messages but rather some text representation or JSON you will have to change what you emit from your Kalix method. You can find some more details about that in the docs here: Serialization options for Java and Scala services :: Kalix Documentation

Can you confirm under the covers how Kalix prepares the messages for Kafka. Does it use org.apache.kafka.common.serialization, and does it implement the Kafka producer based off the kafka broker config file? Where can one access the properties to set specifically the serilaiser/deserializer for keys and values?

Below is the KSQL DB stream I created with the following query where the protobuf deal object flows in and confluent console just shows without deserialising the binary data, the console display behaviour is same as if the data was in avro binary or any other formats than json:

CREATE STREAM deal_streams (dealIdx VARCHAR, brand VARCHAR)
WITH (kafka_topic=‘deal_changes_proto3’, partitions=6, value_format=‘PROTOBUF_NOSR’);

Since protobuf is a schema-less data format, there is no need to register a schema in the schema registry, but that is not disallowed and left to the devs as an option. Instead you can pass PROTOBUF_NOSR (no schema registry) during creating KSQL streams where the KSQL engine will unwrap the schema by itself from the protobuf object, ksqlDB Serialization Formats - ksqlDB Documentation

Once you have your KSQL stream created, if you run below, you should see KSQL working properly selecting dealIdx and brand from the topic data:

SELECT * from deal_streams EMIT CHANGES;

Then the confluent console starts showing emitted deal and brand.

My above answer is based on the discussion regarding KSQL stream processing on Confluent after kalix produces protobuf serialised records in the kafka topic.

So at this stage, ideally there should not be any issues in the latter downstream processing inside KSQL for kalix emitted protobuf records in kafka topics.