2016-07-18 9 views
8

Javaを使用してkafkaコンシューマを作成しています。私はメッセージのリアルタイムを保持したいので、1000以上のように消費を待っているメッセージが多すぎる場合は、消費されていないメッセージを放棄し、最新のオフセットから消費を開始する必要があります。kafkaトピックの最新のオフセットを取得するにはどうすればよいですか?

この問題では、最後にコミットされたオフセットとトピックの最新のオフセット(1パーティションのみ)を比較しようとします。これらの2つのオフセットの差が一定量より大きい場合は、その冗長なメッセージを放棄できるように、次のオフセットとしてトピックを追加します。

私の問題は、トピックの最新のオフセットを取得する方法ですが、古い消費者を使用できると言う人もいますが、それは複雑すぎます。新しい消費者にはこの機能がありますか?

答えて

12

新しい消費者も複雑です。

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();

+0

クライアントの現在のオフセットと最新の既知のカフカトピックオフセットとの差異を計算したい場合は、これは機能しません。 – hiaclibe

5
カフカのバージョンについては

:スニペットの上0.10.1.1

// Get the diff of current position and latest offset 
Set<TopicPartition> partitions = new HashSet<TopicPartition>(); 
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition()); 
partitions.add(actualTopicPartition); 
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition); 
long actualPosition = consumer.position(actualTopicPartition);   
System.out.println(String.format("diff: %s (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition)); 
0
KafkaConsumer<String, String> consumer = ... 
consumer.subscribe(Collections.singletonList(topic)); 
TopicPartition topicPartition = new TopicPartition(topic, partition); 
consumer.poll(0); 
consumer.seekToEnd(Collections.singletonList(topicPartition)); 
long currentOffset = consumer.position(topicPartition) -1; 

特定のトピックやパーティションのオフセットは、現在のコミットメッセージを返します。数。

関連する問題