2017-11-07 13 views
1

私はカフカを初め、独自のストリーミングサービスをカフカに接続するためのプロトタイプを作成しています。最後のメッセージをカフカのトピックに送信

社内のストリームコンシューマは、接続時に最後に受信したメッセージのIDでログオンする必要があるため、トピックに送信された最後のメッセージのキーを取得したいと考えています。

KafkaProducerまたはKafkaConsumerを使用してこれを行うことは可能ですか?

コンシューマを使用して次の操作を実行しようとしましたが、コンソールコンシューマも実行しているときに、再生されたメッセージが表示されます。

// Poll so we know we're connected 
    consumer.poll(100); 
    // Get the assigned partitions 
    Set<TopicPartition> assignedPartitions = consumer.assignment(); 
    // Seek to the end of those partitions 
    consumer.seekToEnd(assignedPartitions); 

    for(TopicPartition partition : assignedPartitions) { 
     final long offset = consumer.committed(partition).offset(); 
     // Seek to the previous message 
     consumer.seek(partition,offset - 1); 
    } 

    // Now get the last message 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) { 
     lastKey = record.key(); 
    } 
    consumer.close(); 

これは予期された動作ですか、間違ったパスですか?

答えて

1

link apicommitted方法はすなわち、特定のパーティションにthe last committed offsetを取得することであるいう問題は、ラインfinal long offset = consumer.committed(partition).offset()である:最後はそれがすでに読んだあなたの消費者のtellカフカサーバーを相殺しました。 あなたはいつも特定のオフセットから読み込むので、間違いなくmessages replayedが得られます。 私はブロックのための最初を取り除くだけでよいと思うので。

+0

もちろん、Consumer.positionの代わりに – ryanpudd

+0

@ryanpuddをチェックしてください。私はスパークプログラムで同じことを達成しようとしていますが、できません。完全なコードへのリンクを提供してください。ありがとう –

関連する問題