gnipからツイートストリームを読み込んでKafakに書き込むスパークストリーミングジョブがあります。スパークストリーミングジョブは約1時間実行した後に終了します
スパークとカフカが同じクラスタで実行されています。
私のクラスタは5つのノードで構成されています。 Kafka-b01 ... Kafka-b05
Kafak-b05でスパークマスターが動作しています。ここで
は、私たちが火花ジョブ
nohupをshを$ SPZRK_HOME/binに/火花提出--total-エグゼキュータ・コア5 --class com.test.java.gnipStreaming.GnipSparkStreamer --masterを提出する方法ですスパーク://カフカ-B05:7077 GnipStreamContainer.jar powertrackのカフカ-B01、カフカ-B02、カフカ-B03、カフカ-B04、カフカ-B05 gnip_live_stream 2 &
スパークジョブが
を殺さ約1時間後にnohubファイルのログには、次の例外があります。
org.apache.spark.storage.BlockFetchException: Failed to fetch block from 2 locations. Most recent failure cause:
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:630)
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:48)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel
at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455)
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306)
at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134)
at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:211)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
... 15 more
Caused by: io.netty.channel.ChannelException: Failed to open a socket.
at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62)
at io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:72)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453)
... 26 more
Caused by: java.net.SocketException: Too many open files
at sun.nio.ch.Net.socket0(Native Method)
at sun.nio.ch.Net.socket(Net.java:411)
at sun.nio.ch.Net.socket(Net.java:404)
at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105)
at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60)
... 33 more
開いているファイルの最大数を3275782に増やしました(古い数字はこの数字のほぼ半分でした)が、私はまだ同じ問題に直面しています。
スパークウェブインターフェイスからワーカーのログstderr
をチェックしたところ、別の例外が見つかりました。
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152)
at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.test.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:59)
at com.test.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:51)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
第2の例外(それはそうであるように見える)は、カフカがスパークしないことに関連しています。
問題は何と思いますか?ここでのYuval Itzchakovからのコメントに基づいて
EDIT
はストリーマ
メインクラスhttp://pastebin.com/EcbnQQ3a
顧客レシーバクラスhttp://pastebin.com/3UFPktKR
カフカに書き込むコードを表示する必要があります。同時接続数は、カフカで設定したトピックごとのパーティション数を超えてはいけません。ファイルハンドルの数を増やすことはパッチにすぎません。コードで何か問題が起きている可能性が最も高いです。 –
Thnx Yuval、コード用に2つのリンクを追加しました。ちなみに、ローカルスパークのインストールに同じ仕事を提出すると、問題なく正常に動作します。私はこの問題の根本原因となる同じSparkクラスター上のカフカの存在を疑う。どう思いますか? – Fanooos