2017-06-26 6 views
1

多くの場合、いくつかのメッセージの集約を含むGlobalKTableが必要です。今、私の単一のインスタンスKTableセットアップは次のようになります。各インスタンスが唯一ではないが受信したメッセージのすべてのために、受信したメッセージの更新された指標を、持っているので、Kafka Streams - グローバルメトリック集計の方法

final KTable<String, Double> aggregatedMetrics = eventStream 
     .groupByKey(Serdes.String(), jsonSerde) 
     .aggregate(
       () -> 0d, 
       new MetricsAggregator(), 
       Serdes.Double(), 
       LOCAL_METRICS_STORE_NAME); 

明らかに、これはスケールしません他のすべてのインスタンス。

final KStreamBuilder builder = new KStreamBuilder(); 
builder.globalTable(METRIC_CHANGES_TOPIC, METRICS_STORE_NAME); 

をして、ちょうどグローバルテーブルを更新してしまうMETRIC_CHANGES_TOPICに私のaggregatedMetrics KTableへの更新をストリーミング:私はこれを使用して考えました。ただし、各インスタンスは、グローバル・テーブルに対する各更新時に他のインスタンスの集計を上書きするだけです。

グローバルアグリゲーションを実行する方法はありますか?

答えて

1

解決方法は私にとって正しいです。

これは正しく鳴らない:

しかし、各インスタンスは、単にグローバルテーブルへの更新ごとに他のインスタンス集計を上書きすることになります。

集約はキーベースで行われることに注意してください。したがって、異なるインスタンスが異なるキーに集約されるため、各インスタンスはただGlobalKTableの独自のキーを更新します。

+0

ありがとうございます!これについて詳しく説明しているどこかのドキュメントへのリンクを教えてください。 –

+1

ほしいと思っています。http://docs.confluent.io/current/streams/architecture.html –

関連する問題