私はカフカを初め、独自のストリーミングサービスをカフカに接続するためのプロトタイプを作成しています。最後のメッセージをカフカのトピックに送信
社内のストリームコンシューマは、接続時に最後に受信したメッセージのIDでログオンする必要があるため、トピックに送信された最後のメッセージのキーを取得したいと考えています。
KafkaProducerまたはKafkaConsumerを使用してこれを行うことは可能ですか?
コンシューマを使用して次の操作を実行しようとしましたが、コンソールコンシューマも実行しているときに、再生されたメッセージが表示されます。
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for(TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition,offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
lastKey = record.key();
}
consumer.close();
これは予期された動作ですか、間違ったパスですか?
もちろん、Consumer.positionの代わりに – ryanpudd
@ryanpuddをチェックしてください。私はスパークプログラムで同じことを達成しようとしていますが、できません。完全なコードへのリンクを提供してください。ありがとう –