実装はPythonで行われました。 confluent_kafkaを使用しています。カフカコンシューマの制御メッセージオフセット
私はカフカのトピックからメッセージをポーリングするコンシューマオブジェクトを持っています。メッセージは他の大きなオブジェクトによる処理に使用され、サイズのために各メッセージ処理後にオブジェクトをバックアップする余裕はありません。
定期的にオブジェクトをダンプしてから、コンシューマを手動でコミットします。以下は私が実装したサンプルコードです。
from confluent_kafka import Consumer, KafkaError, TopicPartition
c = Consumer({
'bootstrap.servers': 'myserver',
'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'enable.auto.commit': "false"
})
c.subscribe(['mytopic'])
offsets = {}
for i in range(10):
msg = c.poll()
if msg.error():
continue
par = msg.partition()
off = msg.offset()
offsets[p] = off
c.commit(async=False)
print(offsets)
私はこのコードを2回目の実行時に同じパーティションから、印刷されたオフセット前から、次のもの、すなわち+1でなければならない場合、私は、メッセージがオフセット期待します。
しかし、オフセットは大きく前進しました。数百人以上。私はまた、次のように手動で位置を割り当てることを試み
:
lst_part = []
for par, off in offsets.items():
lst_part.append(TopicPartition('mytopic', par, off))
c.assign(lst_part)
# then start polling messages
新たにポーリングメッセージが割り当て1.