2017-04-05 18 views
1

実装はPythonで行われました。 confluent_kafkaを使用しています。カフカコンシューマの制御メッセージオフセット

私はカフカのトピックからメッセージをポーリングするコンシューマオブジェクトを持っています。メッセージは他の大きなオブジェクトによる処理に使用され、サイズのために各メッセージ処理後にオブジェクトをバックアップする余裕はありません。

定期的にオブジェクトをダンプしてから、コンシューマを手動でコミットします。以下は私が実装したサンプルコードです。

from confluent_kafka import Consumer, KafkaError, TopicPartition 

c = Consumer({ 
    'bootstrap.servers': 'myserver', 
    'group.id': 'mygroup', 
    'default.topic.config': {'auto.offset.reset': 'smallest'}, 
    'enable.auto.commit': "false" 
}) 
c.subscribe(['mytopic']) 

offsets = {} 

for i in range(10): 
    msg = c.poll() 

    if msg.error(): 
     continue 

    par = msg.partition() 
    off = msg.offset() 
    offsets[p] = off 

c.commit(async=False) 

print(offsets) 

私はこのコードを2回目の実行時に同じパーティションから、印刷されたオフセット前から、次のもの、すなわち+1でなければならない場合、私は、メッセージがオフセット期待します。

しかし、オフセットは大きく前進しました。数百人以上。私はまた、次のように手動で位置を割り当てることを試み

lst_part = [] 

for par, off in offsets.items(): 
    lst_part.append(TopicPartition('mytopic', par, off)) 

c.assign(lst_part) 

# then start polling messages 

新たにポーリングメッセージが割り当て1.

答えて

1

c.commit(async=False) +オフセットされていないメッセージがされたすべての消費パーティションをコミットしますpoll()コールによってクライアントからアプリケーションに返されます。

あなたがcommit()に明示的な[TopicPartition(..)]リストを渡すことができますいずれかのコミットに相殺した上で、よりきめの細かい制御をしたい場合は、あなたがしたいメッセージ/オフセットについてstore_offsets()を呼び出して、明示的またはauto.offset.storeを無効にして、(last_message_offset + 1をコミットすることを確認してください)将来のために店舗commit()コール。

store_offsets()はmasterでのみ利用可能であり、まだリリースされているconfluent-kafka-pythonクライアントでは利用できませんが、間もなくリリースされます。