2016-04-12 27 views
4

kafka-python-1.0.2を使用しています。kafka-python - どのようにパーティションをコミットしますか?

トピックが10個ある場合は、さまざまなパーティションやメッセージをループしながら特定のパーティションをコミットする方法を教えてください。私はちょうどカントは、私が使用したい、ドキュメント内またはそれ以外

ドキュメントから、このどこかの例を見つけるように見える:

consumer.commit(offset=offsets)

具体的に、どのように私はパーティションを作成し、OffsetAndMetadata辞書が必要ですオフセット(オプション、オプション) - {TopicPartition:OffsetAndMetadata}。

私は、関数呼び出しは同じようなものになるだろう期待していた。

consumer.commit(partition, offset)

が、これはケースではないようです。

ありがとうございます。

答えて

3

あなたの質問を書いたときに起こった面白いことに、私はそれを理解したようです。これはうまくいくようです:

meta = consumer.partitions_for_topic(topic) 
options = {} 
options[partition] = OffsetAndMetadata(message.offset, meta) 
consumer.commit(options) 

さらにテストが必要ですが、変更があれば更新されます。

+0

それ以降は何も間違っていますか?私は同じことをしたい。 –

+1

これはこれを行う方法ですが、私はGitLabのkafkaチームに連絡しました。レスポンス:「メタデータは実際には不透明な文字列です。また、Noneを渡すこともできます。メタデータは内部的には使用されませんが、必要に応じてアプリケーション固有のデータを格納する方法としてあります。 –

+0

このスレッドへのリンクです: https://github.com/dpkp/kafka-python/issues/645 –

2

メタデータを使用する必要はありません。 次の例を見てください:

from kafka import TopicPartition 
from kafka.structs import OffsetAndMetadata 
... 
topic = 'your_topic' 
partition = 0 
tp = TopicPartition(topic,partition) 
kafkaConsumer = createKafkaConsumer() 
kafkaConsumer.assign([tp]) 
offset = 15394125 
kafkaConsumer.commit({ 
    tp: OffsetAndMetadata(offset, None) 
}) 

希望します。

1
from kafka import KafkaConsumer 
from kafka import TopicPartition 

TOPIC = "test_topic" 
PARTITION = 0 

consumer = KafkaConsumer(
    group_id=TOPIC, 
    auto_offset_reset="earliest", 
    bootstrap_servers="localhost:9092", 
    request_timeout_ms=100000, 
    session_timeout_ms=99000, 
    max_poll_records=100, 
) 
topic_partition = TopicPartition(TOPIC, PARTITION) 
# format: topic, partition 
consumer.assign([topic_partition]) 
consumer.seek(topic_partition, 1660000) 
# format: TopicPartition, offset. 1660000 is the offset been set. 
for message in consumer: 
    # do something 
  1. これが唯一つ以上のパーティションがある場合は、あなたがそれらのそれぞれに1つずつ割り当て、オフセットを設定する必要があり、そのパーティションのオフセット1つのパーティションとのセットを割り当てます。
  2. aalmeida88の答えは時々、時にはうまくいく場合もありますが、aalmeida88は私に探求のアイデアを与えてくれました。それはまた役に立つ方法のようです。
  3. 気づく必要があるもう一つのことは、自分でパーティションを割り当てると、kafkaマネージャが消費者情報を取得できなかったように見えることです。これは、パーティションを割り当てるときにzookeeperではなくkafkaに設定するためですカフカマネージャーはその情報を入手しないかもしれません。 お手伝い願います!

---編集-----

それを行うには良い方法を探します。

topic_partition = TopicPartition(TOPIC, 
           message.partition) 
consumer.seek(topic_partition, offset_value) 
consumer.commit() 

これは、プログラムで設定する必要があります(まれではない)は、オフセットカフカから取得したメッセージからパーティション情報を抽出して、手動でパーティションを割り当てるための句を保存し、複数のパーティションがある場合利便性をもたらします。

ps:パーティションを1回だけ設定するには、アプリケーションに応じてフラグを設定する必要があります。

関連する問題