1
私はKafka 10.0とhttps://github.com/Shopify/saramaを使用しています。 私は消費者が処理した最新のメッセージのオフセットを取得しようとしています。Golang Kafka 10でGroupIDを取得するには?
これを行うには、グループ名が必要なNewOffsetManagerFromClient(group string, client Client)というメソッドが見つかりました。
消費者グループ名を取得するにはどうすればよいですか?
offsets := make(map[int32]int64)
config := sarama.NewConfig()
config.Consumer.Offsets.CommitInterval = 200 * time.Millisecond
config.Version = sarama.V0_10_0_0
// config.Consumer.Offsets.Initial = sarama.OffsetNewest
cli, _ := sarama.NewClient(kafkaHost, config)
defer cli.Close()
offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)
for _, partition := range partitions {
partitionOffsetManager, _ := offsetManager.ManagePartition(topic, partition)
offset, _ := partitionOffsetManager.NextOffset()
offsets[partition] = offset
}
return offsets
私は
consumer := sarama.NewConsumer(connections, config)
を消費者に作成したが、私は消費者のグループを作成し、そのグループ名を取得する方法がわかりません。
offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)
同様に、あなたのトピックのメッセージを消費した消費者が同じオフセットマネージャを使用しなければならないと、彼らは、特定の使用しているでしょう:あなたは現在のオフセットを見つけるために、独自のオフセットマネージャを作成しようとしている
私はコンシューマグループを作成し、それに自分の選択した特定の名前を付ける必要がありますか?それはShopify/saramaを使用して行うことができますか? –
コンシューマを作成する方法はいくつかあります。 [NewConsumerFromClient](https://godoc.org/github.com/Shopify/sarama#NewConsumerFromClient)を使用すると、コンシューマを作成する前にクライアントでオフセットを設定できます。オフセットを見つけるには、上記のオフセットマネージャを使うために同じグループidを使います。 – Niko