2016-11-26 3 views
3

StateStoreとやり取りしてメッセージをフィルタ処理して複雑なロジックを実行するプロセッサがあります。 process(key,value)メソッドでは、私はcontext.forward(key,value)を使用して、必要なキーと値を送信します。デバッグ目的のために、私もそれらを印刷します。Kafka Stream DSLを使用してプロセッサでキーと値をフィルタリングする方法

私は2つの他のストリームのジョインした結果KStream mergedStreamを持っています。私はそのストリームのレコードにプロセッサを適用したい。私はこのプログラムを起動すると、私は私のコンソールに出力するための適切な値を見ることができますmergedStream.process(myprocessor,"stateStoreName")

:私はこれを達成します。しかし、私がmergedStream.to("topic")を使ってトピックにmergedStreamを送信した場合、そのトピックの値は、プロセッサで転送された値ではなく、元の値になります。

私はkafka-streams 0.10.1.0を使用します。

私は別のストリームへのプロセッサに転送された値を取得するための最良の方法は何ですか?

それはKStream DSLによって作成されたストリームとProcessor APIを混在させることは可能ですか?

答えて

9

ショート:

あなたも、あなたはDSL内のプロセッサのAPIにアクセスすることができますtransform(...)の代わりprocess(...)を使用することができますあなたの問題を解決するために。

ロング

あなたがストリームにプロセッサを適用するprocess(...)使用する場合 - しかし、これは「終了」である(またはシンク)の操作(その戻り値の型がvoidである)、すなわち、それはありません(ここでの唯一のオペレータが何の後継者を持っていないことを意味し、「シンク」! - それはどんな結果がどこかに書かれていることを意味するものではない)任意の結果を返さない

さらに、あなたは基本的にmergedStream.to(...)mergedStream.process(...)を呼び出している場合に分岐・アンド・あなたのストリームを複製し、それぞれの下流の演算子に1つのコピーを送ります(つまり、process a 1コピーをtoにしてください。

DSLとプロセッサAPIを混在させることは絶対に可能です(既に行っています;))。ただし、process(...)を使用すると、DSL内でforward(...)のデータをコンシューマできません。プロセッサAPIの結果を使用する場合は、process(...)の代わりにtransform(...)を使用できます。

関連する問題