2017-09-28 3 views
1

akkaストリームを使用してkafkaからのメッセージの単純なコンシューマを書き込もうとしています。タイムアウト後にコンシューマがWakeupExceptionで中断されました。メッセージ:null。 akka.kafka.consumer.wakeup-timeoutの現在の値は3000ミリ秒です

build.sbt

"com.typesafe.akka" %% "akka-stream-kafka" % "0.17" 

私のコード

object AkkaStreamskafka extends App { 

    // producer settings 
    implicit val system = ActorSystem() 
    implicit val actorMaterializer = ActorMaterializer() 
    val consumerSettings = ConsumerSettings(system, Some(new ByteArrayDeserializer), Some(new StringDeserializer)) 
     .withBootstrapServers("foo:9092") 
     .withGroupId("abhi") 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") 

    val source = Consumer 
     .committableSource(consumerSettings, Subscriptions.topics("my-topic)) 
    val flow = Flow[ConsumerMessage.CommittableMessage[Array[Byte], String]].mapAsync(1){msg => 
     msg.committableOffset.commitScaladsl().map(_ => msg.record.value); 
     } 
    val sink = Sink.foreach[String](println) 
    val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => 
     s => 
     import GraphDSL.Implicits._ 
     source ~> flow ~> s.in 
     ClosedShape 
    }) 
    val future = graph.run() 
    Await.result(future, Duration.Inf) 
} 

しかし、私はエラーを取得

[WARN] [09/28/2017 13:12:52.333] [default-akka.kafka.default-dispatcher-7] 
[akka://default/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. 
Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds 

編集:

私はssh fooを行い、その後、入力することができます次の共同mmandサーバの端末で./kafka-console-consumer --zookeeper localhost:2181 --topic my-topicと私はデータを見ることができます。だから、私のサーバ名はfooであり、kafkaは起動していて、そのマシンで動いていると思います。

EDIT2:

カフカサーバー上では、私は、Clouderaの5.7.1を実行しています。カフカ版はjars/kafka_2.10-0.9.0-kafka-2.0.0.jar

答えて

0

私はこの問題を自分で解決できました。

ライブラリ"com.typesafe.akka" %% "akka-stream-kafka"は、カフカ0.10以降でのみ動作します。以前のバージョンのKafkaではうまくいきません。私がカフカのサーバをカフカサーバに載せたとき、私はKafka 0.9に付属のCloudera 5.7.1を使用していることがわかりました。

このバージョンのAkka Streamsソースを作成するためです。私は、彼らはまた、このコードは私

implicit val actorSystem = ActorSystem() 
implicit val actorMaterializer = ActorMaterializer() 
val kafka = new ReactiveKafka() 
val consumerProperties = ConsumerProperties(
    bootstrapServers = "foo:9092", 
    topic = "my-topic", 
    groupId = "abhi", 
    valueDeserializer = new StringDeserializer() 
) 

val source = Source.fromPublisher(kafka.consume(consumerProperties)) 
val flow = Flow[ConsumerRecord[Array[Byte], String]].map(r => r.value()) 
val sink = Sink.foreach[String](println) 
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink) {implicit builder => 
s => 
    import GraphDSL.Implicits._ 
    source ~> flow ~> s.in 
    ClosedShape 
}) 
val future = graph.run() 
future.onComplete{_ => 
    actorSystem.terminate() 
} 
Await.result(actorSystem.whenTerminated, Duration.Inf) 
のために完璧に働いていたが https://github.com/kciesielski/reactive-kafka

例を持って

"com.softwaremill.reactivekafka" % "reactive-kafka-core_2.11" % "0.10.0" 

を使用するために必要な