akkaストリームを使用してkafkaからのメッセージの単純なコンシューマを書き込もうとしています。タイムアウト後にコンシューマがWakeupExceptionで中断されました。メッセージ:null。 akka.kafka.consumer.wakeup-timeoutの現在の値は3000ミリ秒です
build.sbt
"com.typesafe.akka" %% "akka-stream-kafka" % "0.17"
私のコード
object AkkaStreamskafka extends App {
// producer settings
implicit val system = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, Some(new ByteArrayDeserializer), Some(new StringDeserializer))
.withBootstrapServers("foo:9092")
.withGroupId("abhi")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
val source = Consumer
.committableSource(consumerSettings, Subscriptions.topics("my-topic))
val flow = Flow[ConsumerMessage.CommittableMessage[Array[Byte], String]].mapAsync(1){msg =>
msg.committableOffset.commitScaladsl().map(_ => msg.record.value);
}
val sink = Sink.foreach[String](println)
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder =>
s =>
import GraphDSL.Implicits._
source ~> flow ~> s.in
ClosedShape
})
val future = graph.run()
Await.result(future, Duration.Inf)
}
しかし、私はエラーを取得
[WARN] [09/28/2017 13:12:52.333] [default-akka.kafka.default-dispatcher-7]
[akka://default/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout.
Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
編集:
私はssh foo
を行い、その後、入力することができます次の共同mmandサーバの端末で./kafka-console-consumer --zookeeper localhost:2181 --topic my-topic
と私はデータを見ることができます。だから、私のサーバ名はfoo
であり、kafkaは起動していて、そのマシンで動いていると思います。
EDIT2:
カフカサーバー上では、私は、Clouderaの5.7.1を実行しています。カフカ版はjars/kafka_2.10-0.9.0-kafka-2.0.0.jar