ソケットからデータを読み取るには、RecordParser
を使用します。ソケット接続では、データは、多くの場合、改行文字で区切られます。
RecordParser parser = RecordParser.newDelimited("\n", sock);
RecordParser
はそれがFlowable
に変換することができVert.x ReadStream
です:
今
FlowableHelper.toFlowable(parser)
EchoRequestMessage
ができる場合Buffer
から作成:
public class EchoRequestMessage {
private String message;
public static EchoRequestMessage fromBuffer(Buffer buffer) {
// Deserialize
}
public String getMessage() {
return message;
}
}
そして0123に変換:
public class EchoResponseMessage {
private final String message;
public EchoResponseMessage(String message) {
this.message = message;
}
public Buffer toBuffer() {
// Serialize;
}
}
あなたはエコーサーバーフロー実装するRxJava演算子を使用することができます。
vertx.createNetServer().connectHandler(sock -> {
RecordParser parser = RecordParser.newDelimited("\n", sock);
FlowableHelper.toFlowable(parser)
.map(EchoRequestMessage::fromBuffer)
.map(echoRequestMessage -> {
return new EchoResponseMessage(echoRequestMessage.getMessage());
})
.subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
throwable.printStackTrace();
sock.close();
}, sock::close);
}).listen(1234);
を[EDIT]あなたのプロトコルメッセージをライン分離が、長さ接頭辞されていないであれば、あなたが作成することができますカスタムReadStream
:
class LengthPrefixedStream implements ReadStream<Buffer> {
final RecordParser recordParser;
boolean prefix = false;
private LengthPrefixedStream(ReadStream<Buffer> stream) {
recordParser = RecordParser.newFixed(4, stream);
}
@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}
@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
if (handler == null) {
recordParser.handler(null);
return this;
}
recordParser.handler(buffer -> {
if (prefix) {
prefix = false;
recordParser.fixedSizeMode(buffer.getInt(0));
} else {
prefix = true;
recordParser.fixedSizeMode(4);
handler.handle(buffer);
}
});
return this;
}
@Override
public ReadStream<Buffer> pause() {
recordParser.pause();
return this;
}
@Override
public ReadStream<Buffer> resume() {
recordParser.resume();
return this;
}
@Override
public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
recordParser.endHandler(endHandler);
return this;
}
}
そしてFlowable
に変換:
FlowableHelper.toFlowable(new LengthPrefixedStream(sock))
こんにちは@tsegismont、ありがとうございましたあなたの答えです。しかし、私が受け取っているデータは、改行文字で区切られていません。実際にはデリミタは使用されていません。メッセージは、メッセージの長さを含むメッセージの最初の4バイトの形式で送信されます。従って、メッセージの長さは可変である。どのようにこれらの可変長メッセージを解析するためにRecordParserを使用しますか? – emjay
@emjay次に私自身のReadStreamを作成し、それを 'Flowable'に変換します。私は答えを更新します。 – tsegismont
Brilliant!あなたのお手伝いをしていただきありがとうございますtsegismont! – emjay