私はkafkaストリームを使用しています。私はKTableをトピックに具体化しようとしています。Apache Kafka StreamsトピックへのKTablesのマテリアライズが遅いようです
これは動作しますが、30秒ごとに実行されるようです。
カフカストリームは、KTableの現在の状態をトピックに具体化することをどのようにするかを決定しますか?
この時間を短縮し、さらに「リアルタイム」にする方法はありますか?
は、ここで私は30代に
// Stream of random ints: (1,1) -> (6,6) -> (3,3)
// one record every 500ms
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC);
// grouping by key
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer());
// same behaviour with or without the TimeWindow
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total");
// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
あなたが達成しようとしているものについての詳細は明らかだろうか?いくつかのコード? ktable.to( "topic_name")のようなことをやっているということですか? –
はいこれはまさに私がやっていることです –
あなたの問題を理解していますが、残念ながら私はこの特定のケースをまだ決して調整していません(私たちは実際にこれに近いものを必要とします速い/最新の)それは私が利用可能な構成で遊ぶことを始めたと言った:http://kafka.apache.org/documentation/#configurationおそらく最初に次の: ブローカレベル:log.flush。*もの、オフセット.commit.timeout.ms、 プロデューサのタイムアウト。 トピックレベル:フラッシュ。* ones ストリームレベル:コミット。* ones 解決策が見つかったらここに投稿してください。役立つでしょう –