2017-02-01 4 views
1

私はKafka 10.0とhttps://github.com/Shopify/saramaを使用しています。 私は消費者が処理した最新のメッセージのオフセットを取得しようとしています。Golang Kafka 10でGroupIDを取得するには?

これを行うには、グループ名が必要なNewOffsetManagerFromClient(group string, client Client)というメソッドが見つかりました。

消費者グループ名を取得するにはどうすればよいですか?

offsets := make(map[int32]int64) 

config := sarama.NewConfig() 
config.Consumer.Offsets.CommitInterval = 200 * time.Millisecond 
config.Version = sarama.V0_10_0_0 

// config.Consumer.Offsets.Initial = sarama.OffsetNewest 
cli, _ := sarama.NewClient(kafkaHost, config) 
defer cli.Close() 

offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli) 
for _, partition := range partitions { 
    partitionOffsetManager, _ := offsetManager.ManagePartition(topic, partition) 
    offset, _ := partitionOffsetManager.NextOffset() 

    offsets[partition] = offset 
} 
return offsets 

私は

consumer := sarama.NewConsumer(connections, config) 

を消費者に作成したが、私は消費者のグループを作成し、そのグループ名を取得する方法がわかりません。

offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli) 

同様に、あなたのトピックのメッセージを消費した消費者が同じオフセットマネージャを使用しなければならないと、彼らは、特定の使用しているでしょう:あなたは現在のオフセットを見つけるために、独自のオフセットマネージャを作成しようとしている

答えて

0

グループID。そのグループIDを使用します。

+0

私はコンシューマグループを作成し、それに自分の選択した特定の名前を付ける必要がありますか?それはShopify/saramaを使用して行うことができますか? –

+0

コンシューマを作成する方法はいくつかあります。 [NewConsumerFromClient](https://godoc.org/github.com/Shopify/sarama#NewConsumerFromClient)を使用すると、コンシューマを作成する前にクライアントでオフセットを設定できます。オフセットを見つけるには、上記のオフセットマネージャを使うために同じグループidを使います。 – Niko

関連する問題