私はカフカストリームを実験し始めています。私はhttps://kafka.apache.org/0110/documentation/streams/quickstartに従っています。カフカのカウント数が更新されていない
私のサンドボックスは、Ubuntu 16.04.2 LTS、Kafka 0.11.0.0およびScala 2.11.11を実行するボックスです。
としては、カフカストリームクイックスタートガイドで説明し、ここで私は、その後の手順は、次のとおりです。
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
後者のコマンドを使用してストリーム・WORDCOUNT出力を見て、私の標準出力は次のように示しています
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
:その後
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
、ビン/ kafka-console-consumer.shコマンドを中断することなく、私は次のようにコンソールプロデューサーを再実行します
この新しい追加によってもたらされた変更を反映して標準出力が変更されないのは驚きです。私の理解では、file-input.txtは追加のデータを生成するために使われていたので、単語数が更新されているはずです(すべてのトークンは2回カウントされます)。 私の推論には何が間違っていますか?
もちろん、 'bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo'は全期間実行されていますか?二重チェックするだけで、あなたは実際に新しい値をそこに追加するために、 'streams-file-input'トピックでコンシューマーを実行する必要があります。 –
oh oh ... WordCountDemoがもう実行されていなかったことに気付きませんでした。もう一度それを実行すると出力が正しいように見えました。ありがとうございました !しかし、bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemoは5秒後に停止します。私の理解では、それは永遠に走ると思われます。私は見逃していますか? – SCO