2016-11-28 6 views
1

2つのトピック(外部結合を使用してマージ)から得られるデータを再配列する必要があります。 StateStoreを使用して、最新のシーケンスを保持し、再シーケンスされたメッセージでダウンストリームのストリーム値を変更することをお勧めします。StateStoreSupplier KafkaStreamsにシーケンスを保存する

簡素化問題:

(トピックAから配列、トピックBからSEQ) -

(10,100)(StateStoreにおける現在のシーケンスを維持する)を出力する>新規配列 - > 1

(11101) - > 2

(12102) - > 3

(...、...) - > ...

新しいシーケンスは、キー "currentSeq"の値としてstateStoreに格納されます。シーケンスは各メッセージで増分され、stateStoreに戻されます。

+0

を参照してください。しかし、 'process()'や 'transform()'の中で 'StateStore'を使うことは絶対に可能です。そして、好きな場所に何かを保管することができます。 –

+0

@ MatthiasJ.Sax重要なのは、下流のプロセスで新しいシーケンス番号を「吐き出す」必要があることです。そして、私はStateStoreからシーケンス番号を取得し、それをインクリメントして、次のメッセージのためにストアに戻したいと思います。いくつかの条件では、シーケンス番号が1にリセットされることがあります。 – Tony

答えて

2

登録済み(おそらくカスタム)の状態でプロセッサAPIを使用する必要があります。

process(),transform()またはtransformValue()を使用してプロセッサAPIをDSLとミックスアンドマッチし、状態ストア(名前で)を参照することもできます。

は、私は完全に従うことができない

関連する問題