私はKafkaでSpark Streamingジョブを実行しようとしています。Kafka + Sparkストリーミング:ClosedChannelException
私はSpark Streamingアプリケーションにメッセージを公開するために、カフカ経由でcsvログファイルを送信します。
私はこれを達成するためにSpark Streamingアプリケーションで直接アプローチを使用しています。
私のログファイルのデータは、最初はうまくいきましたが、しばらくしてから、私のScala IDEに次のエラーメッセージが表示されます。
環境:Sparkをすべてのコアでローカルに実行しています。 Zookeeper、Kafkaも私のシステム上でローカルに実行されています。
ERROR:
16/09/05 17:53:28 ERROR Executor: Exception in task 0.0 in stage 390.0 (TID 390)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
何が起こるかというと、この例外は途中で投げ、時にはログデータの束がコンソールにこのエラーメッセージを次のされています。
"Closed Channel Exception"というネットワークエラーのように見えますが、私はこれらのプロセスをすべてローカルで実行しているので、実際に根本的な原因があるのだろうかと思います。
私はこの問題を解決するためのいくつかの指針を得ることができたら嬉しいです。
下記の回答はあなたのためにありますか? –
これには解決策がありますか? – dirceusemighini
@dirceusemighini投稿からの回答を参照してくださいhttp://stackoverflow.com/questions/35807844/kafka-consumer-simpleconsumer-reconnect-due-to-socket-error-java-nio-channels/43105342#43105342、それは役に立ちます。 – Nietzsche