2016-04-24 21 views
2

gnipからツイートストリームを読み込んでKafakに書き込むスパークストリーミングジョブがあります。スパークストリーミングジョブは約1時間実行した後に終了します

スパークとカフカが同じクラスタで実行されています。

私のクラスタは5つのノードで構成されています。 Kafka-b01 ... Kafka-b05

Kafak-b05でスパークマスターが動作しています。ここで

は、私たちが火花ジョブ

nohupをshを$ SPZRK_HOME/binに/火花提出--total-エグゼキュータ・コア5 --class com.test.java.gnipStreaming.GnipSparkStreamer --masterを提出する方法ですスパーク://カフカ-B05:7077 GnipStreamContainer.jar powertrackのカフカ-B01、カフカ-B02、カフカ-B03、カフカ-B04、カフカ-B05 gnip_live_stream 2 &

スパークジョブが

を殺さ約1時間後に

nohubファイルのログには、次の例外があります。

org.apache.spark.storage.BlockFetchException: Failed to fetch block from 2 locations. Most recent failure cause: 
     at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) 
     at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585) 
     at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570) 
     at org.apache.spark.storage.BlockManager.get(BlockManager.scala:630) 
     at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:48) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel 
     at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455) 
     at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306) 
     at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134) 
     at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) 
     at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:211) 
     at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) 
     at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90) 
     at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) 
     at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) 
     at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99) 
     at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) 
     at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588) 
     ... 15 more 
Caused by: io.netty.channel.ChannelException: Failed to open a socket. 
     at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62) 
     at io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:72) 
     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
     at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 
     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
     at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
     at java.lang.Class.newInstance(Class.java:442) 
     at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453) 
     ... 26 more 
Caused by: java.net.SocketException: Too many open files 
     at sun.nio.ch.Net.socket0(Native Method) 
     at sun.nio.ch.Net.socket(Net.java:411) 
     at sun.nio.ch.Net.socket(Net.java:404) 
     at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105) 
     at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) 
     at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60) 
     ... 33 more 

開いているファイルの最大数を3275782に増やしました(古い数字はこの数字のほぼ半分でした)が、私はまだ同じ問題に直面しています。

スパークウェブインターフェイスからワーカーのログstderrをチェックしたところ、別の例外が見つかりました。

java.nio.channels.ClosedChannelException 
     at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) 
     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) 
     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74) 
     at kafka.producer.SyncProducer.send(SyncProducer.scala:119) 
     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) 
     at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) 
     at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) 
     at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188) 
     at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152) 
     at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151) 
     at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) 
     at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) 
     at kafka.producer.Producer.send(Producer.scala:77) 
     at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
     at com.test.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:59) 
     at com.test.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:51) 
     at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) 
     at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

第2の例外(それはそうであるように見える)は、カフカがスパークしないことに関連しています。

問題は何と思いますか?ここでのYuval Itzchakovからのコメントに基づいて

EDIT

はストリーマ

メインクラスhttp://pastebin.com/EcbnQQ3a

顧客レシーバクラスhttp://pastebin.com/3UFPktKR

+0

カフカに書き込むコードを表示する必要があります。同時接続数は、カフカで設定したトピックごとのパーティション数を超えてはいけません。ファイルハンドルの数を増やすことはパッチにすぎません。コードで何か問題が起きている可能性が最も高いです。 –

+0

Thnx Yuval、コード用に2つのリンクを追加しました。ちなみに、ローカルスパークのインストールに同じ仕事を提出すると、問題なく正常に動作します。私はこの問題の根本原因となる同じSparkクラスター上のカフカの存在を疑う。どう思いますか? – Fanooos

答えて

1

問題がありますその」のコードでありますDStream.foreachPartitionの反復でProducerという新しいインスタンスをインスタンス化します。データ集約型のストリームがある場合は、多くのプロデューサが割り当てられ、Kafkaに接続しようとする可能性があります。

私は確信して作ると思います最初の事はあなたがfinallyブロックを使用してデータを送信し、producer.closeを呼ん完了したら、あなたが適切にストリームをクローズしているということです。

public Void call(JavaRDD<String> rdd) throws Exception { 
    rdd.foreachPartition(new VoidFunction<Iterator<String>>() { 

     @Override 
     public void call(Iterator<String> itr) throws Exception { 
          try 
          { 
       Producer<String, String> producer = getProducer(hosts); 
       while(itr.hasNext()) { 
       try { 
        KeyedMessage<String, String> message = 
         new KeyedMessage<String, String>(topic, itr.next()); 
        producer.send(message); 
        } catch (Exception e) { 
        e.printStackTrace(); 
        } 
       } finally { 
            producer.close() 
           } 
     } 
    }); 
    return null; 
} 

それがまだない場合私はカフカプロデューサのためのオブジェクトプールを作成し、オンデマンドでプールすることができます。このようにして、使用可能なプロデューサの数と開いているソケットの数を明示的に制御します。

+1

私はproducer.close()行を追加しました。ジョブは14時間前にスムーズに動作しています。ありがとうございました。 – Fanooos

+0

Yuval、プロデューサーを閉じても、ジョブは常に実行されますが、現在、作業者の標準エラーは「kafka.producer.ProducerClosedException:プロデューサーはすでに閉じています」例外で満ちています。あなたはこれについて何か考えていますか? – Fanooos

+0

finallyブロックで閉じていますか? –