2016-12-05 20 views
0

最初のバッチ: - 100フラットファイルからデータを取り出し、配列にロードしてカフカプロデューサにバイト配列として1つずつ挿入しようとしています。Golang Kafkaはすべてのメッセージを消費していません。offsetnewest

2番目のバッチ: - 私はkafkaコンシューマーから消費し、NoSQLデータベースに挿入しています。

私はKafkaのshopify sarama golangパッケージの設定ファイルでOffsetnewsetを使用します。

私はカフカにメッセージを受信して​​挿入できますが、消費中最初のメッセージのみを受信して​​います。私はsaramaの設定で最新のOffsetを与えて以来。 ここですべてのデータを取得するにはどうすればいいですか?

答えて

1

(すなわち:トピック、パーティション、...)の深さで任意のコード以上がなくカフカの設定方法について説明して何かを伝えることができるようすることは困難であるので、いくつかの簡単なチェックは私の心に来る:

  1. 、あなたが生産を開始前設定OffsetNewestで消費し始めると仮定すると、多分起こって一つのことは、あなたが、そのトピックに関するすべてのパーティションからのかかるサラマードキュメントに係るされていないということです、あなたがで明示的に各パーティションを消費する必要がありますPartitionConsumersを作成します。 https://godoc.org/github.com/Shopify/sarama#Consumerの例から:

    partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest) 
    if err != nil { 
        panic(err) 
    } 
    
    ... 
    
    consumed := 0 
    ConsumerLoop: 
    for { 
        select { 
        case msg := <-partitionConsumer.Messages(): 
         log.Printf("Consumed message offset %d\n", msg.Offset) 
         consumed++ 
        case <-signals: 
         break ConsumerLoop 
        } 
    } 
    
  2. あなたは、実際には、かかる後の生産のすべてのイベントを始めており、そのため、それらすべてを読むためにポインタがないOffsetNewestけどOffsetOldest代わりです。

私はあなたより多くの有益な答えを出すことができなくて残念だけど、あなたには、いくつかのコードを貼り付け以上の詳細を与えるかもしれないならば、我々はより多くのを助けることができます。

関連する問題