AFAIK、Python kafkaコンシューマグループIDの問題
kafkaのパーティションと(コンシューマ)グループの概念が、並列性を実装するために導入されました。私はpythonを使ってkafkaを使って作業しています。私は、(2つの)パーティションを持つ特定のトピックを持っています。これは、2つのコンシューマを持つコンシューマ・グループを起動すると、別のパーティションにマッピング(登録)されることを意味します。
しかし、kafka
のライブラリをPythonで使用すると、私は奇妙な問題に遭遇しました。私は基本的に同じグループidを持つ2人の消費者を始め、メッセージを消費するスレッドを開始しました。
しかし、カフカストリームのすべてのメッセージは、両方で消費されています。これは私にはばかげているようで、概念的には間違っています。とにかく、消費者を特定のパーティションに手動でマップすることはできますか(自動的に別のパーティションにマップされていない場合)?ここ
コードである:
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
期待しながら各々の一つであった:ここ
from kafka import KafkaConsumer
import thread
def con1(consumer):
for msg in consumer:
print msg
consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))
私はカフカコンソール・プロデューサを使用して製造さいくつかのメッセージのために出力されます。ところで、このトピックk-test
には2つのパーティションがあります。私はあなたがdocumentsに基づいて、この機能をサポートしていないカフカ0.8以下のバージョン、作業していると思います
どのようなKafka Pythonクライアントをお使いですか?複数ご利用いただけます:https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python Confluentを使用することをお勧めします。https://github.com/confluentinc/confluent-kafka -python –