2017-10-20 4 views
0

私は非常にシンプルなKafkaStreamsアプリを持っています。KafkaStreamsのローカルステートストレージ

input topic --> extract smth., update aggregate in the local state -> output topic 

入力トピックには1つのパーティションしかなく、すべてがスムーズに機能しました。

しかし、入力トピックのパーティション数を増やした後、私のアプリケーションはパーティションごとにインスタンス化されるので、出力トピックには単一の更新ではなく複数の更新(パーティションごとに1つの更新)が行われます。

どうすればこのような状況に対処できますか?私のアプリケーションでは、すべての入力パーティションの集約が1つしか生成されないようにしたい。

答えて

2

Kafka Streamsはパーティション単位で並列化されているため、パーティション化された単一のトピックを使用してストリームを並列化できます。

入力トピックのトピック数を制御できない場合は、単一のパーティションで中間トピックを作成し、このパーティションを通じてすべてのデータをルーティングできます。

KStream multiPartitionInputStream = ... 
multiPartitionInputStream.through("single-partitioned-topic")... 

注:グローバル集約を行うことは水平にスケールしない、したがって、このパターンは注意して使用する必要があります。

更新:

プロセッサAPIのユーザーのために、あなたはまた、単一のタスクを作成し、このタスクに両方の/すべてのパーティションを割り当てて設定"partition.grouper"経由でカスタムPartitionGrouperを提供することができます。

注:1が正しいPartitionGrouperを書くために多くの内部詳細と前提を理解する必要があるため、DSLを使用するためのカスタムPartitionGrouperを提供することは非常に落胆です。

関連する問題