1

私はKafkaでSpark Streamingジョブを実行しようとしています。Kafka + Sparkストリーミング:ClosedChannelException

私はSpark Streamingアプリケーションにメッセージを公開するために、カフカ経由でcsvログファイルを送信します。

私はこれを達成するためにSpark Streamingアプリケーションで直接アプローチを使用しています。

私のログファイルのデータは、最初はうまくいきましたが、しばらくしてから、私のScala IDEに次のエラーメッセージが表示されます。

環境:Sparkをすべてのコアでローカルに実行しています。 Zookeeper、Kafkaも私のシステム上でローカルに実行されています。

ERROR:

16/09/05 17:53:28 ERROR Executor: Exception in task 0.0 in stage 390.0 (TID 390) 
java.nio.channels.ClosedChannelException 
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) 
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) 
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) 
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 

何が起こるかというと、この例外は途中で投げ、時にはログデータの束がコンソールにこのエラーメッセージを次のされています。

"Closed Channel Exception"というネットワークエラーのように見えますが、私はこれらのプロセスをすべてローカルで実行しているので、実際に根本的な原因があるのだろうかと思います。

私はこの問題を解決するためのいくつかの指針を得ることができたら嬉しいです。

+0

下記の回答はあなたのためにありますか? –

+0

これには解決策がありますか? – dirceusemighini

+0

@dirceusemighini投稿からの回答を参照してくださいhttp://stackoverflow.com/questions/35807844/kafka-consumer-simpleconsumer-reconnect-due-to-socket-error-java-nio-channels/43105342#43105342、それは役に立ちます。 – Nietzsche

答えて

0

ローカルホストをkafka producer.properties config(メタデータ.broker.listなど)のマシンIPに置き換えます。また、/ etcにある/ホストは、置き換えるファイル:

127.0.0.1のローカルホストlocalhost.localdomainを

でx.x.x.xはlocalhost localhost.localdomainをx.x.x.xは、あなたのマシンのIPである

。 役立つかどうかを確認してください。

+0

こんにちは、 ご意見ありがとうございます。私はあなたが言ったことを試みたが、残念ながらそれはまだ同じエラーを与えている。 私のKafkaはWindows 10にインストールされているので、マシン上で/ etc/hostsを見つけることができませんでした。しかし、私は指示どおりにproducer.propertiesファイルを変更しました。 metadata.broker.list = xx.xx.xx.xx:xxxx – Amna

関連する問題