0

私はKafkaストリームを勉強していますが、ドキュメントから取り上げたJava 8のWordCountの最初の例に問題があります。Kafkaストリーム - 最初の例WordCountが最初の周回を正しくカウントしない

kafkaストリーム、Kafka ConnectおよびWordCountラムダ式の最新の使用可能なバージョンを使用します。

私は以下のステップに従っています: 私はカフカに入力トピックを作成し、それを出力します。アプリストリーミングを開始し、.txtファイルからいくつかの単語を挿入して入力トピックをアップロードする

最初のカウントでは、出力トピックで正しく単語がグループ化されていますが、カウントが間違っています。同じ言葉を再挿入しようとすると、以前の間違ったカウントからの連続カウントがすべて正しいです。

コンシューマコンソールで入力トピックのダンプを見ると、正しく読み込まれており、ダーティデータがありません。

どのように最初の時間が間違っていますか?

例〔FIRST DATA]: (カフカにおける入力トピック) ( マイクマイク 試験

(出力トピック)(APPストリーミングが実行されている)、HI 12マイク4試験3 HI、HIカジュアルカウント)

[連続するデータ - 同じ単語入力トピックに転記]

(出力トピック)HI 14マイク6試験4

[新しい試み]

(出力トピック)HI 16マイク8試験5

などに....

+0

これは奇妙な音です。問題を確実に再現できますか?これは起こらないはずです。 –

答えて

3

アパッチカフカにおけるWORDCOUNTデモはthe following linesを有する:

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data 
// Note: To re-run the demo, you need to use the offset reset tool: 
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

これは、アプリケーションを再起動すると、入力トピックが最初から読み取られることを意味します。iff存在しませんKafkaに保存されているWordCountアプリのコンシューマオフセット。アプリのコンシューマオフセットは、ある一定量のアプリが使用されなくなった後にカフカで期限切れになります。デフォルトは24時間です(offsets.retention.minutesbroker configuration参照)。あなたはいくつかの時間以前のカフカで実験し、入力されたトピックにテストデータを入力し

  • は、私は次のことが起こったことを想像できます。

  • その後、実験を再開する前に24時間以上休憩しました。
  • アプリケーションが再起動したときに、入力トピックを最初から最後まで読み直して古いテスト入力データを取得し、「膨らんだ」カウントにつなげました。

私は消費者のコンソールで入力トピックダンプを探している場合、それが適切にロードされるとダーティデータはありません。

あなたはCLIオプション--from-beginningを(https://kafka.apache.org/documentation/#quickstart_consumeを参照)を追加しながら、コンソール消費者に再び入力トピックを見て、上記の私の仮説を検証することができます。 (デフォルトのブローカ構成がより古いデータを削除しますマイナスその間にカフカのトピックから削除された可能性があるすべてのデータ - これはあなたのトピック「yourInputTopic」で利用可能なすべてのデータが表示されます

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning 

7日、参照:)。

+0

あなたの答えをありがとう。実際に私が24時間後(新しいオフセット)にテストしていたとき、私は古いトピックを削除していました(私はキャンセルを有効にしました)、新しいクリーンな実行のためにゼロから再作成していました。問題が再現されました。しかし、今では、サンプルコードの中でstreams streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG、 "最も早い")を追加しました。これはうまくいくようです。たぶん私はこれを正確には解決していないのですが、うまくいきます。 –

+0

これは今でもうれしいです。 –

+0

私は2,3週間前に同様の問題を抱えていましたが、時にはマイナスでした。それは何か似たようなものによって引き起こされるのでしょうか? – foxygen

関連する問題