2016-07-15 18 views
0

AFAIK、Python kafkaコンシューマグループIDの問題

kafkaのパーティションと(コンシューマ)グループの概念が、並列性を実装するために導入されました。私はpythonを使ってkafkaを使って作業しています。私は、(2つの)パーティションを持つ特定のトピックを持っています。これは、2つのコンシューマを持つコンシューマ・グループを起動すると、別のパーティションにマッピング(登録)されることを意味します。

しかし、kafkaのライブラリをPythonで使用すると、私は奇妙な問題に遭遇しました。私は基本的に同じグループidを持つ2人の消費者を始め、メッセージを消費するスレッドを開始しました。

しかし、カフカストリームのすべてのメッセージは、両方で消費されています。これは私にはばかげているようで、概念的には間違っています。とにかく、消費者を特定のパーティションに手動でマップすることはできますか(自動的に別のパーティションにマップされていない場合)?ここ

コードである:

ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki') 
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki') 
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg') 
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg') 
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas') 
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas') 

期待しながら各々の一つであった:ここ

from kafka import KafkaConsumer 
import thread 

def con1(consumer): 
    for msg in consumer: 
     print msg 

consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092']) 
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092']) 

thread.start_new_thread(con1, (consumer1,)) 
thread.start_new_thread(con1, (consumer2,)) 

私はカフカコンソール・プロデューサを使用して製造さいくつかのメッセージのために出力されます。ところで、このトピックk-testには2つのパーティションがあります。私はあなたがdocumentsに基づいて、この機能をサポートしていないカフカ0.8以下のバージョン、作業していると思います

+0

どのようなKafka Pythonクライアントをお使いですか?複数ご利用いただけます:https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python Confluentを使用することをお勧めします。https://github.com/confluentinc/confluent-kafka -python –

答えて

0

...一部の機能はしかし、新しいブローカー上で有効になります。 例えば、完全に協調消費者団体 - すなわち、動的なパーティション同じグループ内の複数の消費者へ 割り当て -

0
from kafka import KafkaConsumer 
from kafka import TopicPartition 

TOPIC = "k-test" 
PARTITION_0 = 0 
PARTITION_1 = 1 

consumer_0 = KafkaConsumer(
    TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092'] 
) 
consumer_1 = KafkaConsumer(
    TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092'] 
) 
topic_partition_0 = TopicPartition(TOPIC, PARTITION_0) 
topic_partition_1 = TopicPartition(TOPIC, PARTITION_1) 
# format: topic, partition 
consumer_0.assign([topic_partition_0]) 
consumer_1.assign([topic_partition_1]) 

アサイン()であるかもしれない... 0.9+カフカのブローカーを使用する必要がありますあなたのために働きますが、それを使用すると、消費者の仕事が止まったときにkafkaは自動的に消費者のバランスを取らないでしょう。

0

bin/kafka-consumer-groups.shコマンドラインツールを実行して、使用しているPython Kafkaクライアントが適切なコンシューマグループ管理をサポートしているかどうかを確認してください。両方のコンシューマが実際に同じグループに属している場合は、相互に排他的なパーティションからメッセージを取得する必要があります。

関連する問題