私はおそらくカフカ消費者のポイントを逃したが、私がやりたいことはあるのです:カフカのトピックから利用可能なすべてのメッセージが消費された後に、どのように将来のメッセージリストを返すのですか?
消費者は、トピックにサブスクライブするトピック内のすべてのメッセージをつかみ、それらのメッセージ
のすべてのリストを将来を返します私が試してみて、これを達成するために書かれているコードは、それが必要なメッセージを消費した後、繰り返しトピックをポーリングし続け、将来はしかし決して戻らない
val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}
def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map { message =>
logger.info(s"Consuming ${message.record.value}")
KafkaMessage(Some(message.record.key()), Some(message.record.value()))
}
.buffer(bufferSize, overflowStrategy)
.runWith(sink)
です。未来を返し、消費者を閉じさせる方法はありますか?