2017-08-30 13 views
0

私はカフカストリームを実験し始めています。私はhttps://kafka.apache.org/0110/documentation/streams/quickstartに従っています。カフカのカウント数が更新されていない

私のサンドボックスは、Ubuntu 16.04.2 LTS、Kafka 0.11.0.0およびScala 2.11.11を実行するボックスです。

としては、カフカストリームクイックスタートガイドで説明し、ここで私は、その後の手順は、次のとおりです。

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt 

bin/kafka-topics.sh --create \ 
    --zookeeper localhost:2181 \ 
    --replication-factor 1 \ 
    --partitions 1 \ 
    --topic streams-file-input 

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt 

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo 

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ 
    --topic streams-wordcount-output \ 
    --from-beginning \ 
    --formatter kafka.tools.DefaultMessageFormatter \ 
    --property print.key=true \ 
    --property print.value=true \ 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 

後者のコマンドを使用してストリーム・WORDCOUNT出力を見て、私の標準出力は次のように示しています

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt 
:その後
all 1 
streams 1 
lead 1 
to 1 
kafka 1 
hello 1 
kafka 2 
streams 2 
join 1 
kafka 3 
summit 1 

、ビン/ kafka-console-consumer.shコマンドを中断することなく、私は次のようにコンソールプロデューサーを再実行します

この新しい追加によってもたらされた変更を反映して標準出力が変更されないのは驚きです。私の理解では、file-input.txtは追加のデータを生成するために使われていたので、単語数が更新されているはずです(すべてのトークンは2回カウントされます)。 私の推論には何が間違っていますか?

+0

もちろん、 'bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo'は全期間実行されていますか?二重チェックするだけで、あなたは実際に新しい値をそこに追加するために、 'streams-file-input'トピックでコンシューマーを実行する必要があります。 –

+0

oh oh ... WordCountDemoがもう実行されていなかったことに気付きませんでした。もう一度それを実行すると出力が正しいように見えました。ありがとうございました !しかし、bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemoは5秒後に停止します。私の理解では、それは永遠に走ると思われます。私は見逃していますか? – SCO

答えて

2

ワードカウントのデモは、ソースに見られるように、5秒後に停止するように設計されています:https://github.com/apache/kafka/blob/0.11.0.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java#L88

は5秒後に停止しないものについては、上記のソースの最新版を参照してください、しかし、あなたがヒットした場合にのみctrl-c:https://github.com/apache/kafka/blob/0.11.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

関連する問題