2016-08-23 12 views
0

私は反応性カフカモジュールからのコードで動作する必要があります。https://github.com/akka/reactive-kafka/blob/master/README.mdパブリッシャーを使用してメッセージを生成するには(reactive-kafkaで)?

私のコードで始まる:

val kafka = new ReactiveKafka() 

val kafkaIdpsMsgs: Publisher[StringKafkaMessage] = kafka.consume(
     ConsumerProperties(
     brokerList = kafkaHosts, 
     zooKeeperHost = zkHosts, 
     topic = "test", 
     groupId = "idps-translator", 
     decoder = new StringDecoder() 
    ).readFromEndOfStream()) 

    val kafkaSamples: Subscriber[String] = kafka.publish(ProducerProperties(
     brokerList = kafkaHosts, 
     topic = "test", 
     encoder = new StringEncoder() 
    )) 

私は(出版社による)メッセージを生成します。それを実現させるために書かなければならないコードは何ですか?

+0

、あなたのプロジェクトでアッカとアッカの反応Streamsを使用していますか?まずAkkaリアクティブストリームを読み、理解します。 ReactiveKafkaを使用することができます。あなたのプロジェクトにAkkaのリアクティブ・ストリームが必要ない場合、Reactive Kafkaはあなたのためには適していません。 –

+0

私は@ SarveshKumarSinghに同意します。 'reactive-kafka'は' Apache KafkaのためのAkkaストリームコネクタ 'です。 'akka-streams'(http://doc.akka.io/docs/akka/2.4/scala/stream/index.html)を使っていますか? – mfirry

答えて

関連する問題