confluent-kafka-goとコンフルエントを介して設定された単一パーティションの単一ブローカーkafkaを使用してgolangに基本チャネルベースのプロデューサを作成しました。 acks
を通して反復中に時々生成すると、正常に生成されたメッセージのオフセットはとして受信されます。カフカは生成時にオフセットを0に戻す
サンプルログ:
{ "レベル": "情報"、 "MSG": "トピックテストに正常に公開されたが、オフセット0、パーティション0および長さ1316"、 "時間": "2017- "0"、 "0"、 "0"、 "0"、 "0"、 "0"、 "0" : "2017-10-03T17:03:43 + 05:30"}
これは、カフカのACKを反復関数である:
func (kc *KafkaClient) HandleAcknowledgements() {
for event := range kc.AckChannel {
message := event.(*kafka.Message)
if message.TopicPartition.Error != nil {
log.Errorf("Delivery Failed: Partition: %d Reason: %v", message.TopicPartition.Partition, message.TopicPartition.Error)
} else {
if message.TopicPartition.Offset == 0 {
bodyLen := len(message.Value)
log.Infof("Published successfully to topic %s, offset %s, Partition %d and Length %d", *message.TopicPartition.Topic, message.TopicPartition.Offset, message.TopicPartition.Partition, bodyLen)
}
}
}
}
マイ保持ポリシーは以下の通りです:
log.retention.hours=168
ヘルプの任意の並べ替えが非常にこれが起こる理由として、素晴らしいことでしょう!
ありがとうございました。
Cクライアントの上にあるラッパーだけのクライアントについては、https://github.com/Shopify/saramaを試してみましたか? –
はい、この問題を再現するために試してみましたが、おそらくこの問題は特定のライブラリ自体と関係があります。私は、メッセージが生成され、オフセットが内部的に正しく維持されていることに気付きました。そのオフセットは、時々、オフセットに反映されていません。実際の理由を知るにはさらに深く掘り下げる必要があります。私は具体的に何かを見つけたら、投稿を更新します!ありがとう@YandryPozo! – nams