StateStoreとやり取りしてメッセージをフィルタ処理して複雑なロジックを実行するプロセッサがあります。 process(key,value)
メソッドでは、私はcontext.forward(key,value)
を使用して、必要なキーと値を送信します。デバッグ目的のために、私もそれらを印刷します。Kafka Stream DSLを使用してプロセッサでキーと値をフィルタリングする方法
私は2つの他のストリームのジョインした結果KStream mergedStream
を持っています。私はそのストリームのレコードにプロセッサを適用したい。私はこのプログラムを起動すると、私は私のコンソールに出力するための適切な値を見ることができますmergedStream.process(myprocessor,"stateStoreName")
:私はこれを達成します。しかし、私がmergedStream.to("topic")
を使ってトピックにmergedStreamを送信した場合、そのトピックの値は、プロセッサで転送された値ではなく、元の値になります。
私はkafka-streams 0.10.1.0を使用します。
私は別のストリームへのプロセッサに転送された値を取得するための最良の方法は何ですか?
それはKStream DSLによって作成されたストリームとProcessor APIを混在させることは可能ですか?