リアタイルネットの前にNetty Tcp Serverを組み込んだ方法は、サーバーのブートストラップを作成してカスタムパイプラインクラスを追加することでした。 Reactor-NettyにはTcpServer.create()がありますが、NettyInboundとNettyOutboundをとり、Monoを返す新しい機能インタフェースを作成する必要があるようです。 私のパイプラインを構築するChannelInitializerを追加したいのであれば、NettyContextを取得するためにブロックする必要があります。 着信メッセージは機能インターフェイスによって受信され、私は応答を送信できますが、何も私のパイプラインを経由しません。パイプラインを使用したReactor Netty TcpServer
私たちをReactor Nettyにして、カスタマイズされたパイプラインを通ってメッセージを流す方法はありますか?
neverComplete()を使用してMono.just( "Hi")を返すと、接続が確立され、メッセージが受信されたときにクライアントに「Hi」が正常に送信されますが、パイプラインにオフロードする必要があります結果をクライアントにフィードバックします。
public void startServer() throws InterruptedException{
EventLoopGroup group = new NioEventLoopGroup(1);
try {
final TcpServer server = TcpServer.create(opts -> opts
.eventLoopGroup(group)
.listen(tcpSocketAddress));
server
.newHandler((in, out) -> {
in.receive()
.take(1)
.log(ApolloApplicationTests.class.getName())
.subscribe(data -> {
log.info("Server Received: {}", data.toString(CharsetUtil.UTF_8));
latch.countDown();
});
return out.sendString(Mono.just("Hi")).neverComplete();
})
.block().addHandler(clientEndPoint)
.channel()
.closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageDecoder;
import reactor.util.Logger;
import reactor.util.Loggers;
@Configurable
@Component
public class ClientEndPoint extends ChannelInitializer<Channel> {
final Logger log = Loggers.getLogger(ApolloApplication.class);
private ChannelPipeline pipeline;
@Autowired
private ChannelHandlerAdapter messageInterchange;
@Autowired
private LengthFieldBasedFrameDecoder lowOrderVliDecoder;
@Autowired
private MessageToMessageDecoder<ByteBuf> messageDecoder;
@Autowired
private LengthFieldPrepender vliEncoder;
@Autowired
@Qualifier("inBound")
List<ChannelHandler> inBoundHandlers;
@Autowired
@Qualifier("outBound")
List<ChannelHandler> outBoundHandlers;
@Override
protected void initChannel(Channel sc) throws Exception {
this.pipeline = sc.pipeline();
this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder);
this.pipeline.addLast("messageDecoder", this.messageDecoder);
this.pipeline.addLast("vliEncoder", this.vliEncoder);
for (ChannelHandler handler : this.inBoundHandlers) {
this.pipeline.addLast(handler);
}
this.pipeline.addLast("messageInterchange", this.messageInterchange);
for (ChannelHandler handler : this.outBoundHandlers) {
this.pipeline.addLast(handler);
}
}
public void accept(Channel sc) {
this.pipeline = sc.pipeline();
this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder);
this.pipeline.addLast("messageDecoder", this.messageDecoder);
this.pipeline.addLast("vliEncoder", this.vliEncoder);
for (ChannelHandler handler : this.inBoundHandlers) {
this.pipeline.addLast(handler);
}
this.pipeline.addLast("messageInterchange", this.messageInterchange);
for (ChannelHandler handler : this.outBoundHandlers) {
this.pipeline.addLast(handler);
}
}
}