2017-06-23 17 views
2

私はkafkaストリームを使用しています。私はKTableをトピックに具体化しようとしています。Apache Kafka StreamsトピックへのKTablesのマテリアライズが遅いようです

これは動作しますが、30秒ごとに実行されるようです。

カフカストリームは、KTableの現在の状態をトピックに具体化することをどのようにするかを決定しますか?

この時間を短縮し、さらに「リアルタイム」にする方法はありますか?

は、ここで私は30代に

// Stream of random ints: (1,1) -> (6,6) -> (3,3) 
// one record every 500ms 
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC); 

// grouping by key 
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer()); 

// same behaviour with or without the TimeWindow 
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total"); 

// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC); 
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC); 
+0

あなたが達成しようとしているものについての詳細は明らかだろうか?いくつかのコード? ktable.to( "topic_name")のようなことをやっているということですか? –

+0

はいこれはまさに私がやっていることです –

+1

あなたの問題を理解していますが、残念ながら私はこの特定のケースをまだ決して調整していません(私たちは実際にこれに近いものを必要とします速い/最新の)それは私が利用可能な構成で遊ぶことを始めたと言った:http://kafka.apache.org/documentation/#configurationおそらく最初に次の: ブローカレベル:log.flush。*もの、オフセット.commit.timeout.ms、 プロデューサのタイムアウト。 トピックレベル:フラッシュ。* ones ストリームレベル:コミット。* ones 解決策が見つかったらここに投稿してください。役立つでしょう –

答えて

2

これはcommit.interval.msによって制御され、デフォルト値を使用している実際のコードです。ここで詳細: http://docs.confluent.io/current/streams/developer-guide.html

キャッシングのセマンティクスは、データが状態ストアにフラッシュし、いつでもcommit.interval.ms又はcache.max.bytes.bufferingの最も早い次の下流プロセッサノードに転送されることです(キャッシュ圧)ヒット。

、ここで:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams

+0

Super、thank you Michal –