2016-02-16 33 views
10

私はKafkaのPythonハイレベルコンシューマを使用しており、トピックの各パーティションの最新のオフセットを知りたいと考えています。しかし、私はそれを働かせることはできません。kafkaトピックのパーティションの最新のオフセットを取得する方法は?

from kafka import TopicPartition 
from kafka.consumer import KafkaConsumer 

con = KafkaConsumer(bootstrap_servers = brokers) 
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] 

con.assign(ps) 
for p in ps: 
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) 

print "Subscription = %s"%con.subscription() 
print "con.seek_to_beginning() = %s"%con.seek_to_beginning() 

しかし、私が手出力は、私がassignを使用して、代替的なアプローチを持っていますが、結果はそれは私がかもしれないことを文書の一部から思わ同じ

con = KafkaConsumer(bootstrap_servers = brokers) 
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] 

con.assign(ps) 
for p in ps: 
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) 

print "Subscription = %s"%con.subscription() 
print "con.seek_to_beginning() = %s"%con.seek_to_beginning() 
print "con.seek_to_end() = %s"%con.seek_to_end() 

ある

For partition 0 highwater is None 
For partition 1 highwater is None 
For partition 2 highwater is None 
For partition 3 highwater is None 
For partition 4 highwater is None 
For partition 5 highwater is None 
.... 
For partition 96 highwater is None 
For partition 97 highwater is None 
For partition 98 highwater is None 
For partition 99 highwater is None 
Subscription = None 
con.seek_to_beginning() = None 
con.seek_to_end() = None 

ですfetchが発行されていない場合は、この動作が発生します。しかし、私はそれを強制する方法を見つけることができません。私は間違って何をしていますか?

トピックの最新のオフセットを取得する別の方法と簡単な方法がありますか?

+0

ない100%正、私はあなたのコードはカフカ-python'が実際にブローカーに接続された '前HIGHWATERの値を返していると思います。 'KafkaConsumer'は非同期なので、実際には、ハイウォーターの値を入力するメッセージを消費しなければならないと思います。https://github.com/dpkp/kafka-python/issues/509#issuecomment-178114516 –

答えて

23

最後にこの日といくつかの間違ったスタートを過ごした後、私は解決策を見つけてそれを働かせることができました。他人がそれを参照できるように彼女を投稿する。あなたはカフカ/ binに存在カフカのシェルスクリプトを使用したい場合は

from kafka import SimpleClient 
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy 
from kafka.common import OffsetRequestPayload 

client = SimpleClient(brokers) 

partitions = client.topic_partitions[topic] 
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] 

offsets_responses = client.send_offset_request(offset_requests) 

for r in offsets_responses: 
    print "partition = %s, offset = %s"%(r.partition, r.offsets[0]) 
+1

方法はありますかパーティションごとにコンシューマ/グループごとに現在の/次のオフセットを取得しますか? – GreenThumb

+0

悲しいことに、SimpleClientは廃止予定です。上記のoffsets_responsesはFailedPayloadsErrorを返します。FailedPayloadsError – dreynold

9

、あなたはkafka-run-class.shを使用することにより、最新かつ最小のオフセットを得ることができます。最新のオフセットコマンドを取得するには

この

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname 

ようになりますあなたはlink

を次の中からオフセットシェルを入手に関する詳細な情報を見つけることができる最小のオフセットコマンドを取得するには、この

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname 

のようになります。

希望します。

4
from kafka import KafkaConsumer, TopicPartition 

TOPIC = 'MYTOPIC' 
GROUP = 'MYGROUP' 
BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092'] 

consumer = KafkaConsumer(
     bootstrap_servers=BOOTSTRAP_SERVERS, 
     group_id=GROUP, 
     enable_auto_commit=False 
    ) 


for p in consumer.partitions_for_topic(TOPIC): 
    tp = TopicPartition(TOPIC, p) 
    consumer.assign([tp]) 
    committed = consumer.committed(tp) 
    consumer.seek_to_end(tp) 
    last_offset = consumer.position(tp) 
    print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed))) 

consumer.close(autocommit=False) 
0

これを達成するための別の方法は、ポーリングによって最新の利用可能パーティションオフセットを取得するためにseek_to_endメソッドを使用して、最後を取得するために、消費者オフセット消費されています。

from kafka import KafkaConsumer 
consumer = KafkaConsumer('my-topic', 
        group_id='my-group', 
        bootstrap_servers=['localhost:9092']) 
consumer.poll() 
consumer.seek_to_end() 

この方法は、特に消費者グループを使用する場合に便利です。

SOURCES:

  1. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.poll
  2. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.seek_to_end
+0

私のサーバーには何百ものメッセージがありますが、consumer.poll()は{ – Nick

関連する問題