特定のカフカパーティションにpythonコンシューマスクリプトを添付する方法。
コンシューマスクリプトのインスタンスを2つ実行すると(以下に示す)、それぞれがランダムに1つのパーティションを取得し、その特定のパーティションのすべてのメッセージを消費/印刷します。
しかし、ディスク上のローカルファイルという名前のパーティションにこれらのメッセージを出力する必要があるため、事前に宣言されたパーティションIDにスクリプトの各インスタンスを添付すると、作業が楽になります。
ファイル名:特定のパーティションにKafaConsumerをアタッチする
Date/Hour/PARTITION_ID-0.CSV
Date/Hour/PARTITION_ID-1.CSV
どのように達成するかについてのアイデア。
気軽に代替手段を提案する。
カフカのセットアップ:(Pythonで)
Topic:my-topic3 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: my-topic3 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: my-topic3 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
カフカ消費者のスクリプト[FIX WITH ]
from kafka import KafkaConsumer
from kafka import TopicPartition
# To consume latest messages and auto-commit offsets
#consumer = KafkaConsumer('my-topic3',
# group_id='my-group',
# bootstrap_servers=['192.168.150.80:9092'])
# To consume messages from a specific PARTITION [ FIX ]
consumer = KafkaConsumer(bootstrap_servers='192.168.150.80:9092')
consumer.assign([TopicPartition('my-topic3', 1)])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("Topic= %s : Partition= %d : Offset= %d: key= %s value= %s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
更新:以下示唆したように、私は割り当て機能を使用しますが、上のまま不正な状態エラーが発生する
割り当て関数
consumer.assign([TopicPartition('my-topic3',1)])
エラー
Traceback (most recent call last):
File "consumerExample.py", line 13, in <module>
consumer.assign([TopicPartition('my-topic3',1)])
File "/usr/lib/python2.7/site-packages/kafka/consumer/group.py", line 278, in assign
self._subscription.assign_from_user(partitions)
File "/usr/lib/python2.7/site-packages/kafka/consumer/subscription_state.py", line 189, in assign_from_user
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
kafka.errors.IllegalStateError: You must choose only one way to configure
感謝。既に運を使わずにassign関数を試しました。不正な状態エラーの取得 kafka.errors.IllegalStateError:コンシューマを構成するには、(1)特定のトピックを名前で購読する、(2)正規表現パターンに一致するトピックを購読する、トピックパーティション。 – coredump
コンシューマを作成するときに 'group_id = 'my-group''を削除する必要があります。コンシューマを作成するときは、動的パーティション割り当てのために結合するグループを表すためです。 – ck1
ありがとう、それは動作します。しかし、今では私のカフカの基本について少し混乱しています。なぜカフカコンシューマーがグループに縛られ、特定のパーティションを聞いていないのですか?ここにポイントがないのですか? – coredump