2016-04-02 7 views
2

私はソケットにスパークストリーミングとリッスンを試みていますが、私は受信機とDStreamを作成するためにrawSocketStreamメソッドを使用しています。しかし、DStreamを印刷すると、以下の例外が発生します。スパークストリーミングrawSocketStream

コードDSTREAMを作成するには:

JavaSparkContext jsc = new JavaSparkContext("Master", "app"); 
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Seconds(3)); 
JavaReceiverInputDStream<Object> rawStream = jssc.rawSocketStream("localhost", 9999); 
log.info(tracePrefix + "Created the stream ..."); 
rawStream.print(); 
jssc.start(); 
jssc.awaitTermination(); 

コードTCP接続を介してprotobugオブジェクトを送信するために:

FileInputStream input = new FileInputStream("address_book"); 
AddressBook book = AddressBookProtos.AddressBook.parseFrom(input); 
log.info(tracePrefix + "Size of contacts: " + book.getPersonList().size()); 

ServerSocket serverSocket = new ServerSocket(9999); 
log.info(tracePrefix + "Waiting for connections ..."); 
Socket s1 = serverSocket.accept(); 
log.info(tracePrefix + "Accepted a connection ..."); 
while(true) { 
    Thread.sleep(3000); 
    ObjectOutputStream out = new ObjectOutputStream(s1.getOutputStream()); 
    out.writeByte(book.getSerializedSize()); 
    out.write(book.toByteArray()); 
    out.flush(); 
    log.info(tracePrefix + "Written to new socket"); 
} 

スタックトレースを以下に示します。

java.lang.IllegalArgumentException 
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) 
    at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 


2016-04-02 07:45:47,607 ERROR [Executor task launch worker-0] org.apache.spark.streaming.receiver.ReceiverSupervisorImpl 
Stopped receiver with error: java.lang.IllegalArgumentException 

2016-04-02 07:45:47,613 ERROR [Executor task launch worker-0] org.apache.spark.executor.Executor 
Exception in task 0.0 in stage 0.0 (TID 0) 

java.lang.IllegalArgumentException 
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) 
    at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
2016-04-02 07:45:47,646 ERROR [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager 
Task 0 in stage 0.0 failed 1 times; aborting job 

2016-04-02 07:45:47,656 ERROR [submit-job-thread-pool-0] org.apache.spark.streaming.scheduler.ReceiverTracker 
Receiver has been stopped. Try to restart it. 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException 
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) 
    at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
Caused by: java.lang.IllegalArgumentException 
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) 
    at org.apache.spark.streaming.dstream.RawNetworkReceiver.onStart(RawInputDStream.scala:88) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

の作業コード

コードTCP

 ServerSocket serverSocket = new ServerSocket(9999); 
     log.info(tracePrefix + "Waiting for connections ..."); 
     Socket s1 = serverSocket.accept(); 
     log.info(tracePrefix + "Accepted a connection ..."); 
     while(true) { 
      Thread.sleep(3000); 
      DataOutputStream out = new DataOutputStream(s1.getOutputStream()); 
      byte[] bytes = book.toByteArray(); 
      log.info(tracePrefix + "Serialized size: " + book.getSerializedSize()); 
      out.writeInt(book.getSerializedSize()); 
      log.info(tracePrefix + "Sending bytes: " + Arrays.toString(bytes)); 
      out.write(bytes); 
//   out.write("hello world !".getBytes()); 
      out.flush(); 
      log.info(tracePrefix + "Written to new socket"); 
     } 

いるProtobufオブジェクトを送信0コードは、受信機とDSTREAM

JavaReceiverInputDStream<GeneratedMessage> rawStream = jssc.receiverStream(new JavaSocketReceiver("localhost", 9999)); 
log.info(tracePrefix + "Created the stream ..."); 
rawStream.print(); 

private static class JavaSocketReceiver extends Receiver<GeneratedMessage> { 

     /** 
     * 
     */ 
     private static final long serialVersionUID = -958378677169958045L; 
     String host = null; 
     int port = -1; 

     JavaSocketReceiver(String host_, int port_) { 
      super(StorageLevel.MEMORY_AND_DISK()); 
      host = host_; 
      port = port_; 
     } 

     @Override 
     public void onStart() { 
      new Thread() { 
       @Override 
       public void run() { 
        receive(); 
       } 
      }.start(); 
     } 

     @Override 
     public void onStop() { 
     } 

     private void receive() { 
      try { 
       Socket socket = null; 
       ObjectInputStream in = null; 
       try {     
        // Open a socket to the target address and keep reading from 
        // it 
        log.info(tracePrefix + "Connecting to " + host + ":" + port); 
        SocketChannel channel = SocketChannel.open(); 
        channel.configureBlocking(true); 
        channel.connect(new InetSocketAddress(host, port)); 
        log.info(tracePrefix + "Connected to " + host + ":" + port); 

        ArrayBlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<>(2); 

        Thread blockPushingThread = new Thread(new Runnable() { 

         @Override 
         public void run() { 
          int nextBlockNumber = 0; 
          while (true) { 
           try { 
            ByteBuffer buffer = queue.take(); 
            nextBlockNumber += 1; 
            AddressBook book = AddressBook.parseFrom(buffer.array()); 
//         log.info(tracePrefix + "Got back the object: " + book); 
            store(book); 
           } catch (InterruptedException ie) { 
            log.error(tracePrefix + "Failed processing data", ie); 
           } catch (Throwable t) { 
            log.error(tracePrefix + "Failed processing data", t); 
           } 
          } 
         } 
        }); 
        blockPushingThread.setDaemon(true); 

        blockPushingThread.start(); 

        ByteBuffer lengthBuffer = ByteBuffer.allocate(4); 
        while (true) { 
         lengthBuffer.clear(); 
         readFully(channel, lengthBuffer); 
         lengthBuffer.flip(); 
         int length = lengthBuffer.getInt(); 
//      log.info(tracePrefix + "The length read: " + length); 
         ByteBuffer dataBuffer = ByteBuffer.allocate(length); 
         readFully(channel, dataBuffer); 
         dataBuffer.flip(); 
//      log.info(tracePrefix + "Read a block with " + length + " bytes"); 
         queue.put(dataBuffer); 
        } 


       } finally { 
        Closeables.close(in, /* swallowIOException = */ true); 
        Closeables.close(socket, /* swallowIOException = */ true); 
       } 
      } catch (ConnectException ce) { 
       ce.printStackTrace(); 
       restart("Could not connect", ce); 
      } catch (Throwable t) { 
       t.printStackTrace(); 
       restart("Error receiving data", t); 
      } 
     } 

     private void readFully(ReadableByteChannel channel, ByteBuffer dest) { 
      while (dest.position() < dest.limit()) { 
       try { 
        if (channel.read(dest) == -1) { 
         throw new EOFException("End of channel"); 
        } 
       } catch (IOException e) { 
        log.error(tracePrefix + "Failed reading from channel: " + channel, e); 
       } 
      } 
     } 
    } 

上記JavaSocketReceiverスパークストリーミングモジュールのrawSocketStreamから取られを作成します。私がバイトを送信しているクライアントコードで、DataOutputStreamをObjectOutputStreamに変更すると、破損したヘッダー例外が発生し、Inbuilt rawSocketStreamを使用して受信パケットをlistenすると、ストリーミングコードでByteBufferでIllegalArgumentExceptionが発生します334)

+0

'そのaddressBook'オブジェクトがどのくらいありますか? – maasg

+0

これはほとんど10kbです。 –

答えて

1

ByteBufferのマニュアルを参照すると、IllegalArgumentExceptionは負のバッファサイズを割り当てようとした場合にのみ発生します。

RawInputDStreamプロトコルには、対応するペイロードが続く整数サイズフィールドが必要です。そのフィールドは4-byte Integerです。疑問に示す

差出人プログラム:

out.writeByte(book.getSerializedSize()); 

は1つのバイトとしてとして整数サイズを書いています。したがって、読取り側がペイロードサイズをデコードしようとすると、このバイトとデコードされた結果が負の整数になるペイロードの情報とを組み合わせるので、破損したフィールドを読み込みます。

解決策ではなく、4バイト(32ビット)整数を書くことをする必要があります:

out.writeInt(book.getSerializedSize()); 
+0

こんにちは、長きがバイトとして書かれていたので、私はReceiverを自分で作成したときに、あなたの変更はストリームでうまくいきましたが、rawSocketStreamではまだ運がありません。 –

+0

@LokeshKumarPプロデューサのバグを修正した後でも、全く同じエラーでRawDStreamが失敗しますか? – maasg

+0

はい、バイトバッファで容量が0より小さい同じillegalargumentexcepion –

2

私はJavaStreamingContextクラスのrawSocketStreamメソッドを使用して、同様の問題を調査してきました。私の場合、byte []データを送受信したいと思っていました。

元の質問に関しては、カスタムReceiverを記述することなくrawSocketStreamを使用してデータを受信するという目的を達成できます。このソリューションは、送信側でObjectOutputStreamをどのように使用するかに基づいています。 Lokeshが発見したように、Socket出力ストリームを使用してObjectOutputStreamオブジェクトを構築すると、IllegalArgumentExceptionが発生します。私はObjectOutputStreamのコンストラクタにByteArrayOutputStreamオブジェクトを渡すこと

ServerSocket serverSocket = new ServerSocket(9999); 
Socket clientSocket = serverSocket.accept(); 
OutputStream outputStream = clientSocket.getOutputStream(); 
ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
ObjectOutputStream oos = new ObjectOutputStream(bos); 

は注意:送信者がソケットを構築し、次のように流れている場合ただし、例外は発生しません。元のアプローチの根本的な問題は、ObjectOutputStreamのコンストラクタが、シリアライズストリームヘッダを基になるストリーム(https://docs.oracle.com/javase/8/docs/api/java/io/ObjectOutputStream.html#ObjectOutputStream-java.io.OutputStream-)に書き込むことです。基本となるストリームがclientSocket出力ストリームの場合、Spark側の未処理のソケットストリームは、接続時に直ちにシリアル化ストリームヘッダーデータを受け取り、受信ByteBufferオブジェクトの4バイトバッファサイズとして解釈します。明らかに、このヘッダーの最初の4バイトは、受信者がそれを負の整数として解釈し、ByteBuffer allocateメソッドがIlleglaArgumentExceptionをスローするようにするために、最上位ビットとして1を持たなければなりません。

次に、ObjectOutputStreamを使用して、シリアル化されたデータをByteArrayOutputStreamオブジェクトに書き込みます。 byte []データを送信していても、write(byte[])メソッドが例外を発生させました。代わりにwriteObject()メソッドを使用して、byte []データをwriteObject()に渡す必要がありました。

ByteArrayOutputStreamがロードされた後、そのサイズを計算し、サイズをソケット出力ストリームに書き込みます。このデータをObjectOutputStreamではなくソケット出力ストリームに書き込んで、ByteArrayOutputStreamデータを書き込む前にサイズデータを書き込んでください。サイズは4バイトとして送信する必要があるが、outputStream.write()メソッドは下位8ビットのみを書き込むため、正しいデータを送信するためにいくつかのビット演算子を使用する必要があります。

これは、送信者のコードは次のようになります。

受信側で
try(ServerSocket serverSocket = new ServerSocket(9999); 
     Socket clientSocket = serverSocket.accept(); 
     OutputStream outputStream = clientSocket.getOutputStream(); 
     ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
     ObjectOutputStream oos = new ObjectOutputStream(bos);) 
{ 
    byte[] bytes; 
    // Load the byte[] with data 
    ... 

    oos.writeObject(bytes); 
    oos.flush(); 
    oos.close(); 

    outputStream.write(bos.size() >> 24); 
    outputStream.write(bos.size() >> 16); 
    outputStream.write(bos.size() >> 8); 
    outputStream.write(bos.size()); 
    outputStream.write(bos.toByteArray()); 

    // Keep socket connections open 
} 
catch (IOException e) { 
    e.printStackTrace(); 
} 

、関連するコードはこれです:

SparkConf conf = new SparkConf().setAppName("log jamming").setMaster("local[2]"); 
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1)); 
JavaReceiverInputDStream<byte[]> bytes = jsc.rawSocketStream("localhost", 9999); 

// Have fun with the RDD 

jsc.start(); 
jsc.awaitTermination();