多くの場合、いくつかのメッセージの集約を含むGlobalKTable
が必要です。今、私の単一のインスタンスKTable
セットアップは次のようになります。各インスタンスが唯一ではないが受信したメッセージのすべてのために、受信したメッセージの更新された指標を、持っているので、Kafka Streams - グローバルメトリック集計の方法
final KTable<String, Double> aggregatedMetrics = eventStream
.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> 0d,
new MetricsAggregator(),
Serdes.Double(),
LOCAL_METRICS_STORE_NAME);
明らかに、これはスケールしません他のすべてのインスタンス。
final KStreamBuilder builder = new KStreamBuilder();
builder.globalTable(METRIC_CHANGES_TOPIC, METRICS_STORE_NAME);
をして、ちょうどグローバルテーブルを更新してしまうMETRIC_CHANGES_TOPIC
に私のaggregatedMetrics
KTableへの更新をストリーミング:私はこれを使用して考えました。ただし、各インスタンスは、グローバル・テーブルに対する各更新時に他のインスタンスの集計を上書きするだけです。
グローバルアグリゲーションを実行する方法はありますか?
ありがとうございます!これについて詳しく説明しているどこかのドキュメントへのリンクを教えてください。 –
ほしいと思っています。http://docs.confluent.io/current/streams/architecture.html –