2

私はトポロジービルダーでKafkaのStreams APIを使用しています。 あるデータ型を別のデータ型に変換できるプロセッサを、パイプライン内の次のプロセッサが使用できるようにするには、どうすればいいのか教えてください。Kafka Streams:データ型の変換方法は?

[topic]--(string)-->[processor: parse json]--(object)-->[processor 2]--(object)-->[sink] 

任意のアイデア:シンプルなユースケースとして

答えて

2

の値をKafkaのトピックでStringからJSONに変換するとします。

  • JSON(またはPOJO)に文字列を変換するためのコード:

    あなたは2つの部分のみを必要としています。ここで必要なものを選んでください。これを簡単にするためにいくつかのJavaライブラリがあります。

  • Kafka Streamsでは、ソーストピックから読み取るために(1)Stringの値serdeを定義し、(2)JSONデータ(またはPojo)を宛先トピックに書き込むための対応する値serdeを定義します。 Serdesは、必要な時/場所でデータを実体化する必要があります(たとえば、カフカにPojosを書くにはマテリアライゼーションが必要です)。

具体例については、https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageviewのサンプルコードを参照してください。 Apache KafkaのStreams APIでJSONを使用します。

関連する問題