2016-10-05 19 views
1
  • カフカサーババージョン0.8.2.2 =
  • librdkafka(カフカ++クライアントC)バージョン0.9.1 =

二消費者が一方にありますグループ。 2人の消費者がトピックから異なるメッセージを受け取る必要があります。librdkafkaで "group.id"を使用するにはどうすればよいですか? (カフカバージョン "0.8.2.2" である)、例えば

私は以下のように使いますが、うまくいかないようです。 2人の消費者がトピックからすべてのメッセージを取得します。

std::string topic_str = "sample"; 
std::string errstr; 
int32_t partition = 0; 
int64_t start_offset = RdKafka::Topic::OFFSET_END; 

RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); 
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); 

//...omit... 

conf->set("broker.version.fallback", "0.8.2.2", errstr); 
conf->set("group.id", "group_001", errstr); 

//...omit... 

RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr); 
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr); 
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset); 

//...omit... 

while (1) { 

    //...omit... 

    RdKafka::Message *msg = consumer->consume(topic, partition, 1000); 

    //print message and offset 
} 

答えて

1

librdkafkaは、新しいブローカベースのバランス消費者団体(KafkaConsumerクラスは、)カフカ0.9で添加suppoprts。 (Kafka 0.8のコンシューマーグループバランシングはZookeeperに基づいており、正式なScalaクライアントのみで実装されています)

バランスコンシューマーのフォームを持たないレガシー低レベルコンシューマーサポート。

カフカクラスターを0.9(または0.10!)にアップグレードし、代わりに新しいKafkaConsumerクラスを使用するようにコードを変更することをお勧めします。

関連する問題