2017-11-28 3 views
2

現在、私は外部システムと(Javaで記述する)アプリケーション間のTCP通信を必要とするプロジェクトに取り組んでいます。われわれの知る限り、これは通常のNIOを使用して簡単に達成できます。しかし、私が取り組んでいるこの新しいプロジェクトの一環として、Vert.xを使ってTCP通信を提供する必要があります。下の画像を参照してください:vert.x-rxを使用してリアクティブクライアントとサーバーのTCP通信を作成する方法

enter image description here

を右側に、私は、TCPサーバーが左側に、外部システムからの接続を待っているとして実行自分のアプリケーションを持っています。私はそれがTCPを作成し、シンプルなこのような何かの接続を監視するために読んだ:しかし

NetServer server = vertx.createNetServer(); 
server.listen(1234, "localhost", res -> { 
    if (res.succeeded()) { 
    System.out.println("Server is now listening!"); 
    } else { 
    System.out.println("Failed to bind!"); 
    } 
}); 

を、私は理解できないビットは、外部システムは、私のアプリケーションとに接続したときにどのように処理するかですTCP経由でEchoRequestMessagesを送信します。私のアプリケーションは、受信したバイトのバッファを取り、それをEchoRequestMessage POJOにデコードしてから、EchoResponseMessageをバイトのバッファにエンコードして外部システムに送り返す必要があります。

私はvertex-rxを使って、EchoRequestMessageの受信、そのデコード、EchoResponseMessageのエンコーディングの反応性プログラミングを行い、それを外部システムに送信して、すべて1つのビルダーパターンタイプセットアップ。私はObservablesと購読について読んだことがありますが、私は何を観察するのか、何を購読するのか分かりません。どんな助けでも大歓迎です。

答えて

4

ソケットからデータを読み取るには、RecordParserを使用します。ソケット接続では、データは、多くの場合、改行文字で区切られます。

RecordParser parser = RecordParser.newDelimited("\n", sock); 

RecordParserはそれがFlowableに変換することができVert.x ReadStreamです:

FlowableHelper.toFlowable(parser) 

EchoRequestMessageができる場合Bufferから作成:

public class EchoRequestMessage { 
    private String message; 

    public static EchoRequestMessage fromBuffer(Buffer buffer) { 
    // Deserialize 
    } 

    public String getMessage() { 
    return message; 
    } 
} 

そして0123に変換:

public class EchoResponseMessage { 
    private final String message; 

    public EchoResponseMessage(String message) { 
    this.message = message; 
    } 

    public Buffer toBuffer() { 
    // Serialize; 
    } 
} 

あなたはエコーサーバーフロー実装するRxJava演算子を使用することができます。

vertx.createNetServer().connectHandler(sock -> { 

    RecordParser parser = RecordParser.newDelimited("\n", sock); 

    FlowableHelper.toFlowable(parser) 
    .map(EchoRequestMessage::fromBuffer) 
    .map(echoRequestMessage -> { 
     return new EchoResponseMessage(echoRequestMessage.getMessage()); 
    }) 
    .subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> { 
     throwable.printStackTrace(); 
     sock.close(); 
    }, sock::close); 

}).listen(1234); 

を[EDIT]あなたのプロトコルメッセージをライン分離が、長さ接頭辞されていないであれば、あなたが作成することができますカスタムReadStream

class LengthPrefixedStream implements ReadStream<Buffer> { 
    final RecordParser recordParser; 
    boolean prefix = false; 

    private LengthPrefixedStream(ReadStream<Buffer> stream) { 
    recordParser = RecordParser.newFixed(4, stream); 
    } 

    @Override 
    public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) { 
    recordParser.exceptionHandler(handler); 
    return this; 
    } 

    @Override 
    public ReadStream<Buffer> handler(Handler<Buffer> handler) { 
    if (handler == null) { 
     recordParser.handler(null); 
     return this; 
    } 
    recordParser.handler(buffer -> { 
     if (prefix) { 
     prefix = false; 
     recordParser.fixedSizeMode(buffer.getInt(0)); 
     } else { 
     prefix = true; 
     recordParser.fixedSizeMode(4); 
     handler.handle(buffer); 
     } 
    }); 
    return this; 
    } 

    @Override 
    public ReadStream<Buffer> pause() { 
    recordParser.pause(); 
    return this; 
    } 

    @Override 
    public ReadStream<Buffer> resume() { 
    recordParser.resume(); 
    return this; 
    } 

    @Override 
    public ReadStream<Buffer> endHandler(Handler<Void> endHandler) { 
    recordParser.endHandler(endHandler); 
    return this; 
    } 
} 

そしてFlowableに変換:

FlowableHelper.toFlowable(new LengthPrefixedStream(sock)) 
+0

こんにちは@tsegismont、ありがとうございましたあなたの答えです。しかし、私が受け取っているデータは、改行文字で区切られていません。実際にはデリミタは使用されていません。メッセージは、メッセージの長さを含むメッセージの最初の4バイトの形式で送信されます。従って、メッセージの長さは可変である。どのようにこれらの可変長メッセージを解析するためにRecordParserを使用しますか? – emjay

+0

@emjay次に私自身のReadStreamを作成し、それを 'Flowable'に変換します。私は答えを更新します。 – tsegismont

+0

Brilliant!あなたのお手伝いをしていただきありがとうございますtsegismont! – emjay

0

着信パケットを1つのメッセージに集約するためにRecordParser.fixedSizeMode()メソッドを使用するのはかなり複雑です。

私はRecordParser.newDelimitedを使用します。 TCP受信者に送信されたデータを操作できる場合、送信されるすべてのメッセージに4バイトのメッセージ長をプレフィックスとして付ける代わりに、最後に固有の区切り文字を付けることができます。

コードサンプル:(私はRxJava構文を使用していない)

を送る:

を読ん
byte[] serialized = SerializationUtils.serialize(o); 
senderSocket.write(Buffer.buffer(serialized).appendString(MSG_DELIMITER)); 
System.out.println("I sent some bytes: " + serialized.length); 

protected static String MSG_DELIMITER = "_I_Am_so0o_UNIQUEQE_"; 
protected Vertx vertx; 
protected final RecordParser parser = RecordParser.newDelimited(MSG_DELIMITER, new Output()); 

public static main(String[] args) { 
    this.vertx = Vertx.vertx(); 
    this.vertx.deployVerticle(new Receiver()); 
} 

public class Receiver extends AbstractVerticle { 
    @Override 
    public void start() { 

     NetServer receiver = this.vertx.createNetServer(); 

     receiver.connectHandler(socket -> 

       socket.handler(parser::handle)); 

     receiver.listen(port, host, res -> { 
      if (res.succeeded()) { 
       System.out.println("Receiver is now listening!"); 
      } else { 
       System.out.println("Failed to bind!"); 
      } 
     }); 
    } 

    @Override 
    public void stop() { 
     System.out.println("Receiver stopped"); 
    } 
} 

public class Output implements Handler<Buffer> { 

    @Override 
    public void handle(Buffer event) { 
     try { 
      E deserialized = (E) SerializationUtils.deserialize(event.getBytes()); 
      queue.put(deserialized); 
      System.out.println("I received some bytes: " + event.length()); 

     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
関連する問題