2016-04-03 9 views
0

イムオフセットを設定することによって、トピックからのデータを消費するが、アサーションエラーを取得しようとしている -てAssertionError:未割り当てパーティション

from kafka import KafkaConsumer 

consumer = KafkaConsumer('foobar1', 
         bootstrap_servers=['localhost:9092']) 
print 'process started' 
print consumer.partitions_for_topic('foobar1') 
print 'done' 
consumer.seek(0,10) 

for message in consumer: 
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
              message.offset, message.key, 
              message.value)) 
print 'process ended' 

エラー: -

Traceback (most recent call last): 
    File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module> 
    consumer.seek(0,10) 
    File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek 
    assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' 
AssertionError: Unassigned partition 

答えて

1

あなたはconsumer.assignを(呼び出す必要がありますが)シークを呼び出す前にTopicPartitionsのリストを表示します。 また、seekの第1引数もTopicPartitionであることに注意してください。 Kafka 0.9kafka-pythonと私の場合はKafkaConsumer API

0

を参照してください、パーティションの割り当てはfor message in consumerの間に起こっています。だから、シークのoprationは繰り返しの後にする必要があります。私はグループのオフセットを次のコードでリセットしました:

関連する問題