をintercetors.classesではありませんので、私はあなたがその間に解決策を見つけたと仮定します。しかし、他の人を助ける場合には、メッセージの内容に基づいて異なるトピックにメッセージをディスパッチする私のProducerInterceptor
クラスは、ストリームに指定された出力が既に存在しない限り呼び出されませんでした。
私が最初に試したのは、出力トピックを指定する必要がないと思ったからです。これは動作しません:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
しかし、この処理が行われます。
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic").through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
それは何も第二の例ではそのdummy-output-topic
に公開しないされることを注目に値します、とto
代わりのthrough
を使用しても動作するように表示されていること同じ方法。
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
.map(new CustomKeyValueMapper)
.through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
私はこれらの例には、誰もが作業を支援願っています:私のコードは、実際より次のようになりますので、私の場合は
は、私は、さまざまなトピックにそれらをディスパッチするインターセプタを使用する前に、レコードを変更するmap
を呼び出しましたProducerInterceptor
さんは私と同じ間違いをしました。
返信いただきありがとうございます。はい、producer.propertiesのinterceptor.classesとして正しく記述されています。入力ミスを申し訳ありません。 –