2017-02-12 8 views
1

ケーキ溶液Akka client for scala and Kafkaを使用しています。 KafkaProducerActor俳優を作成していて、askパターンを使用してメッセージを送信しようとしていますが、将来、いくつかの操作を実行しますが、毎回askタイムアウト例外に直面しています。以下は私のコードです:後Apache Kafka:KafkaProducerActorが例外ASkタイムアウトをスローします。

class SimpleAkkaProducer (config: Config, system: ActorSystem) { 

    private val producerConf = KafkaProducer. 
    Conf(config, 
     keySerializer = new StringSerializer, 
     valueSerializer = new StringSerializer) 

    val actorRef = system.actorOf(KafkaProducerActor.props(producerConf)) 

    def sendMessageWayOne(record: ProducerRecords[String, String]) = { 
    actorRef ! record 
    } 

    def sendMessageWayTwo(record: ProducerRecords[String, String]) = { 
    implicit val timeout = Timeout(100.seconds) 
    val future = (actorRef ? record).mapTo[String] 
    future onComplete { 
     case Success(data) => println(s" >>>>>>>>>>>> ${data}") 
     case Failure(ex) => ex.printStackTrace() 
    } 
    } 
} 

object SimpleAkkaProducer { 
    def main(args: Array[String]): Unit = { 
    val system = ActorSystem("KafkaProducerActor") 
    val config = ConfigFactory.defaultApplication() 
    val simpleAkkaProducer = new SimpleAkkaProducer(config, system) 

    val topic = config.getString("akka.topic") 
    val messageOne = ProducerRecords.fromKeyValues[String, String](topic, 
     Seq((Some("Topics"), "First Message")), None, None) 

    simpleAkkaProducer.sendMessageWayOne(messageOne) 
    simpleAkkaProducer.sendMessageWayTwo(messageOne) 
    } 
} 

は例外です:あなたはNone以外のものであることをProducerRecordssuccessResponsefailureResponse値を指定した場合

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://KafkaProducerActor/user/$a#-1520717141]] after [100000 ms]. Sender[null] sent message of type "cakesolutions.kafka.akka.ProducerRecords". 
    at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604) 
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) 
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:864) 
    at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) 
    at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) 
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:862) 
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) 
    at java.lang.Thread.run(Thread.java:745) 
+0

は 'KafkaProducerActor'の定義を追加します。 –

+0

'KafkaProducerActor'はapiのデザインで、https://github.com/cakesolutions/scala-kafka-client/blob/1cbeccbb183ca06585f4b9fb1e366048e993ac51/akka/src/main/scala/cakesolutions/kafka/akka/KafkaProducerActor.scala –

+0

のように実装されていますちょっと@YuvalItzchakovあなたはこれのための任意の解決策を見つけることがありますか? –

答えて

2

プロデューサーの俳優は、送信者に応答します。カフカの書き込みが成功し、カフカの書き込みが失敗したときfailureResponse値が返送されたときにsuccessResponse値は、送信者に返送されます。

例:

val record = ProducerRecords.fromKeyValues[String, String](
    topic = topic, 
    keyValues = Seq((Some("Topics"), "First Message")), 
    successResponse = Some("success"), 
    failureResponse = Some("failure") 
) 

val future = (actorRef ? record).mapTo[String] 
future onComplete { 
    case Success("success") => println("Send succeeded!") 
    case Success("failure") => println("Send failed!") 
    case Success(data) => println(s"Send result: $data") 
    case Failure(ex) => ex.printStackTrace() 
} 
+0

いいえ、@Jaakko、これは意味があります、これは動作していますが、それでもメッセージを送信した後で 'RrecordMetaData'をどのように取得できますか? –

+0

残念ながら、現在のバージョンのプロデューサーアクターで 'RecordMetaData'を得る方法はありません。パッチや提案はプロジェクトページで歓迎します。 :) –

+0

お手伝いをしてくれてありがとう@Jaakko。 –

関連する問題