2017-10-03 1 views
1

confluent-kafka-goとコンフルエントを介して設定された単一パーティションの単一ブローカーkafkaを使用してgolangに基本チャネルベースのプロデューサを作成しました。 acksを通して反復中に時々生成すると、正常に生成されたメッセージのオフセットはとして受信されます。カフカは生成時にオフセットを0に戻す

サンプルログ:

{ "レベル": "情報"、 "MSG": "トピックテストに正常に公開されたが、オフセット0、パーティション0および長さ1316"、 "時間": "2017- "0"、 "0"、 "0"、 "0"、 "0"、 "0"、 "0" : "2017-10-03T17:03:43 + 05:30"}

これは、カフカのACKを反復関数である:

func (kc *KafkaClient) HandleAcknowledgements() { 
    for event := range kc.AckChannel { 
    message := event.(*kafka.Message) 

    if message.TopicPartition.Error != nil { 
     log.Errorf("Delivery Failed: Partition: %d Reason: %v", message.TopicPartition.Partition, message.TopicPartition.Error) 
    } else { 
     if message.TopicPartition.Offset == 0 { 
     bodyLen := len(message.Value) 
     log.Infof("Published successfully to topic %s, offset %s, Partition %d and Length %d", *message.TopicPartition.Topic, message.TopicPartition.Offset, message.TopicPartition.Partition, bodyLen) 
     } 
    } 
    } 
} 

マイ保持ポリシーは以下の通りです:

log.retention.hours=168 

ヘルプの任意の並べ替えが非常にこれが起こる理由として、素晴らしいことでしょう!

ありがとうございました。

+0

Cクライアントの上にあるラッパーだけのクライアントについては、https://github.com/Shopify/saramaを試してみましたか? –

+0

はい、この問題を再現するために試してみましたが、おそらくこの問題は特定のライブラリ自体と関係があります。私は、メッセージが生成され、オフセットが内部的に正しく維持されていることに気付きました。そのオフセットは、時々、オフセットに反映されていません。実際の理由を知るにはさらに深く掘り下げる必要があります。私は具体的に何かを見つけたら、投稿を更新します!ありがとう@YandryPozo! – nams

答えて

0

設定でproduce.offset.reportをtrueに設定してみてください。

プロデューサーのためにtrueにproduce.offset.reportを設定してみてください、そうでない場合 それだけで各 バッチ/ messageSetの最後のメッセージのオフセットを報告します。

goクライアントはデフォルトで実際にこれを実行する必要があります。

https://github.com/confluentinc/confluent-kafka-go/issues/95#issuecomment-330837647

これはあなたの問題を解決しない場合は、コードを投稿し、あなたが生成するために使用configのだろうか?

関連する問題