2017-03-06 2 views
0

私はおそらくカフカ消費者のポイントを逃したが、私がやりたいことはあるのです:カフカのトピックから利用可能なすべてのメッセージが消費された後に、どのように将来のメッセージリストを返すのですか?

消費者は、トピックにサブスクライブするトピック内のすべてのメッセージをつかみ、それらのメッセージ

のすべてのリストを将来を返します

私が試してみて、これを達成するために書かれているコードは、それが必要なメッセージを消費した後、繰り返しトピックをポーリングし続け、将来はしかし決して戻らない

val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) => 
list :+ kafkaMessage 
} 

def consume(topic: String) = 
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic)) 
    .map { message => 
    logger.info(s"Consuming ${message.record.value}") 
    KafkaMessage(Some(message.record.key()), Some(message.record.value())) 
    } 
    .buffer(bufferSize, overflowStrategy) 
    .runWith(sink) 

です。未来を返し、消費者を閉じさせる方法はありますか?

答えて

1

Kafkaはストリーミングデータ用であるため、新しいデータをいつでもトピックに追加できるため、「すべてのメッセージ」はありません。あなたが経由の「ログの現在の末尾を」取得する必要がありますどのように多くのレコードの最後のpollで返して終了してしまったか

    1. チェック:

      私は推測、あなたができる2つの可能なものがありますendOffsetsこれを、パーティションごとの最新レコードのオフセットと比較してください。両方が一致すれば、あなたは戻ることができます。

    最初のアプローチは簡単ですが、2番目のアプローチほど信頼性が低いという欠点があります。理論的には、利用可能なレコードがあっても(この変更があまり起こっていなくても)投票がゼロのレコードを返す可能性があります。

    Scalaでこの終了条件をどのように表現するかはわかりません(私はScalaに詳しくはないので)。

  • 関連する問題