0
私は反応性カフカモジュールからのコードで動作する必要があります。https://github.com/akka/reactive-kafka/blob/master/README.mdパブリッシャーを使用してメッセージを生成するには(reactive-kafkaで)?
私のコードで始まる:
val kafka = new ReactiveKafka()
val kafkaIdpsMsgs: Publisher[StringKafkaMessage] = kafka.consume(
ConsumerProperties(
brokerList = kafkaHosts,
zooKeeperHost = zkHosts,
topic = "test",
groupId = "idps-translator",
decoder = new StringDecoder()
).readFromEndOfStream())
val kafkaSamples: Subscriber[String] = kafka.publish(ProducerProperties(
brokerList = kafkaHosts,
topic = "test",
encoder = new StringEncoder()
))
私は(出版社による)メッセージを生成します。それを実現させるために書かなければならないコードは何ですか?
、あなたのプロジェクトでアッカとアッカの反応Streamsを使用していますか?まずAkkaリアクティブストリームを読み、理解します。 ReactiveKafkaを使用することができます。あなたのプロジェクトにAkkaのリアクティブ・ストリームが必要ない場合、Reactive Kafkaはあなたのためには適していません。 –
私は@ SarveshKumarSinghに同意します。 'reactive-kafka'は' Apache KafkaのためのAkkaストリームコネクタ 'です。 'akka-streams'(http://doc.akka.io/docs/akka/2.4/scala/stream/index.html)を使っていますか? – mfirry