2016-06-25 7 views
0

特定のカフカパーティションにpythonコンシューマスクリプトを添付する方法。
コンシューマスクリプトのインスタンスを2つ実行すると(以下に示す)、それぞれがランダムに1つのパーティションを取得し、その特定のパーティションのすべてのメッセージを消費/印刷します。

しかし、ディスク上のローカルファイルという名前のパーティションにこれらのメッセージを出力する必要があるため、事前に宣言されたパーティションIDにスクリプトの各インスタンスを添付すると、作業が楽になります。
ファイル名:特定のパーティションにKafaConsumerをアタッチする

Date/Hour/PARTITION_ID-0.CSV 
Date/Hour/PARTITION_ID-1.CSV 

どのように達成するかについてのアイデア。
気軽に代替手段を提案する。

カフカのセットアップ:(Pythonで)

Topic:my-topic3 PartitionCount:2 ReplicationFactor:2 Configs: 
Topic: my-topic3 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 
Topic: my-topic3 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 

カフカ消費者のスクリプト[FIX WITH ]

from kafka import KafkaConsumer 
from kafka import TopicPartition 

# To consume latest messages and auto-commit offsets 
#consumer = KafkaConsumer('my-topic3', 
#       group_id='my-group', 
#       bootstrap_servers=['192.168.150.80:9092']) 

# To consume messages from a specific PARTITION [ FIX ] 
consumer = KafkaConsumer(bootstrap_servers='192.168.150.80:9092') 
consumer.assign([TopicPartition('my-topic3', 1)]) 

for message in consumer: 
    # message value and key are raw bytes -- decode if necessary! 
    # e.g., for unicode: `message.value.decode('utf-8')` 
    print ("Topic= %s : Partition= %d : Offset= %d: key= %s value= %s" % (message.topic, message.partition, 
              message.offset, message.key, 
              message.value)) 

更新:以下示唆したように、私は割り当て機能を使用しますが、上のまま不正な状態エラーが発生する
割り当て関数

consumer.assign([TopicPartition('my-topic3',1)]) 

エラー

Traceback (most recent call last): 
    File "consumerExample.py", line 13, in <module> 
    consumer.assign([TopicPartition('my-topic3',1)]) 
    File "/usr/lib/python2.7/site-packages/kafka/consumer/group.py", line 278, in assign 
    self._subscription.assign_from_user(partitions) 
    File "/usr/lib/python2.7/site-packages/kafka/consumer/subscription_state.py", line 189, in assign_from_user 
    raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) 
kafka.errors.IllegalStateError: You must choose only one way to configure 

答えて

0

You can use the assign() method手動で消費者に1つ以上のパーティションを割り当てることができます。

ありis some example code here:そのような迅速な応答を

>>> # manually assign the partition list for the consumer 
>>> from kafka import TopicPartition 
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') 
>>> consumer.assign([TopicPartition('foobar', 2)]) 
>>> msg = next(consumer) 
+0

感謝。既に運を使わずにassign関数を試しました。不正な状態エラーの取得 kafka.errors.IllegalStateError:コンシューマを構成するには、(1)特定のトピックを名前で購読する、(2)正規表現パターンに一致するトピックを購読する、トピックパーティション。 – coredump

+0

コンシューマを作成するときに 'group_id = 'my-group''を削除する必要があります。コンシューマを作成するときは、動的パーティション割り当てのために結合するグループを表すためです。 – ck1

+0

ありがとう、それは動作します。しかし、今では私のカフカの基本について少し混乱しています。なぜカフカコンシューマーがグループに縛られ、特定のパーティションを聞いていないのですか?ここにポイントがないのですか? – coredump

関連する問題