2017-09-15 4 views
1

SparkアプリケーションをSparkマスターに送信しようとしています。マスターと複数のスレーブはOpenShift環境で動作しています。 SparkマスタのWeb UIには、接続されている作業者が表示されます。Spark:クラスタモードでOpenShiftにデプロイするときのStreamCorruptedException

アプリケーションジャーは、すべてのスパークポッドで/jarsに展開されています。

これは私の提出スクリプトを次のとおりです。

spark-submit2.cmd --conf "spark.driver.extraClassPath=/jars" 
        --conf "spark.executor.extraClassPath=/jars" 
        --conf "spark.submit.deployMode=cluster" 
        --master spark://******:31824 
        --class Main 'local:/jars/SparkHelloWorld-1.0-SNAPSHOT.jar' 

アプリケーション自体は単純です:

Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult: 
     at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100) 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108) 
     at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:233) 
     at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:233) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
     at org.apache.spark.deploy.Client$.main(Client.scala:233) 
     at org.apache.spark.deploy.Client.main(Client.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
     at java.lang.reflect.Method.invoke(Unknown Source) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.RuntimeException: java.io.StreamCorruptedException: invalid stream header: 01000D31 
     at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:857) 
     at java.io.ObjectInputStream.<init>(ObjectInputStream.java:349) 
     at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63) 
     at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63) 
     at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) 
     at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:107) 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:259) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
     at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:308) 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:258) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
     at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:257) 
     at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:577) 
     at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:562) 
     at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:159) 
     at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107) 
     at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
     at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
     at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
     at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) 
     at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) 
     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) 
     at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
     at java.lang.Thread.run(Thread.java:748) 

     at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:207) 
     at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) 
     at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) 
     at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) 
     at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) 
     at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) 
     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) 
     at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
     at java.lang.Thread.run(Unknown Source) 

public class Main { 

    private static String MASTER = "spark://******:31824"; 

    public static void main(String[] args) throws Exception { 

     //Create a SparkContext to initialize 
     SparkConf conf = new SparkConf() 
       .setMaster(MASTER) 
       .setAppName("SparkPi"); 

     // Create a Java version of the Spark Context 
     JavaSparkContext sc = new JavaSparkContext(conf); 

     final int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 4; 
     final int n = 100000 * slices; 
     final List<Integer> l = new ArrayList<>(n); 
     for (int i = 0; i < n; i++) { 
      l.add(i); 
     } 

     final JavaRDD<Integer> dataSet = sc.parallelize(l, slices); 

     final int count = dataSet.map(integer -> { 
      double x = Math.random() * 2 - 1; 
      double y = Math.random() * 2 - 1; 
      return (x * x + y * y < 1) ? 1 : 0; 
     }).reduce((a, b) -> a + b); 

     System.out.println("Pi is roughly " + 4.0 * count/n); 
    } 
} 

私はこのスクリプトを実行するたびに、私は次の例外を取得します

どのようなスパークの文書でもこの問題を見つけることができませんでした。何か不足していますか?

答えて

2

これは、マスタとスレーブの異なるSparkバージョンに関連している可能性があります。

1

それにより、次のいずれかに起こる可能性がありますコードで使用されるスパークの

  1. 版クラスタのものとは異なります。
  2. コーリングに使用され、クラスタの実行に使用されたスパークバージョンの不一致。

私の場合は、2.11で構築されたスパークで実行されていたクラスタで、スカラ2.10で構築されたスパークの依存関係を使用していました。 誰かを助けることを願っています。乾杯!

関連する問題