2017-05-15 16 views
0

リアタイルネットの前にNetty Tcp Serverを組み込んだ方法は、サーバーのブートストラップを作成してカスタムパイプラインクラスを追加することでした。 Reactor-NettyにはTcpServer.create()がありますが、NettyInboundとNettyOutboundをとり、Monoを返す新しい機能インタフェースを作成する必要があるようです。 私のパイプラインを構築するChannelInitializerを追加したいのであれば、NettyContextを取得するためにブロックする必要があります。 着信メッセージは機能インターフェイスによって受信され、私は応答を送信できますが、何も私のパイプラインを経由しません。パイプラインを使用したReactor Netty TcpServer

私たちをReactor Nettyにして、カスタマイズされたパイプラインを通ってメッセージを流す方法はありますか?

neverComplete()を使用してMono.just( "Hi")を返すと、接続が確立され、メッセージが受信されたときにクライアントに「Hi」が正常に送信されますが、パイプラインにオフロードする必要があります結果をクライアントにフィードバックします。

public void startServer() throws InterruptedException{ 
    EventLoopGroup group = new NioEventLoopGroup(1); 
    try { 
     final TcpServer server = TcpServer.create(opts -> opts 
      .eventLoopGroup(group) 
      .listen(tcpSocketAddress)); 
     server 
      .newHandler((in, out) -> { 
       in.receive() 
        .take(1) 
        .log(ApolloApplicationTests.class.getName()) 
        .subscribe(data -> { 
         log.info("Server Received: {}", data.toString(CharsetUtil.UTF_8)); 
         latch.countDown(); 
        }); 
        return out.sendString(Mono.just("Hi")).neverComplete(); 
      }) 
      .block().addHandler(clientEndPoint) 
      .channel() 
      .closeFuture().sync(); 

    } finally { 
     group.shutdownGracefully().sync(); 
    } 
} 



import java.util.List; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Configurable; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.stereotype.Component; 
import io.netty.buffer.ByteBuf; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelHandler; 
import io.netty.channel.ChannelHandlerAdapter; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelPipeline; 
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; 
import io.netty.handler.codec.LengthFieldPrepender; 
import io.netty.handler.codec.MessageToMessageDecoder; 
import reactor.util.Logger; 
import reactor.util.Loggers; 

@Configurable 
@Component 
public class ClientEndPoint extends ChannelInitializer<Channel> { 

final Logger log = Loggers.getLogger(ApolloApplication.class); 

private ChannelPipeline pipeline; 

@Autowired 
private ChannelHandlerAdapter messageInterchange; 

@Autowired 
private LengthFieldBasedFrameDecoder lowOrderVliDecoder; 

@Autowired 
private MessageToMessageDecoder<ByteBuf> messageDecoder; 

@Autowired 
private LengthFieldPrepender vliEncoder; 

@Autowired 
@Qualifier("inBound") 
List<ChannelHandler> inBoundHandlers; 

@Autowired 
@Qualifier("outBound") 
List<ChannelHandler> outBoundHandlers; 

    @Override 
    protected void initChannel(Channel sc) throws Exception { 
     this.pipeline = sc.pipeline(); 
     this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder); 

     this.pipeline.addLast("messageDecoder", this.messageDecoder); 

     this.pipeline.addLast("vliEncoder", this.vliEncoder); 

     for (ChannelHandler handler : this.inBoundHandlers) { 
      this.pipeline.addLast(handler); 
     } 

     this.pipeline.addLast("messageInterchange", this.messageInterchange); 

     for (ChannelHandler handler : this.outBoundHandlers) { 
      this.pipeline.addLast(handler); 
     } 
    } 


    public void accept(Channel sc) { 
     this.pipeline = sc.pipeline(); 
     this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder); 

     this.pipeline.addLast("messageDecoder", this.messageDecoder); 

     this.pipeline.addLast("vliEncoder", this.vliEncoder); 

     for (ChannelHandler handler : this.inBoundHandlers) { 
      this.pipeline.addLast(handler); 
     } 

     this.pipeline.addLast("messageInterchange", this.messageInterchange); 

     for (ChannelHandler handler : this.outBoundHandlers) { 
      this.pipeline.addLast(handler); 
     } 
    } 
} 

答えて

0

だから、この私は、パイプラインが消費者を実装し、典型的な網状パイプラインとして受け入れる方法でパイプラインを構築するクラスです

public Mono<? extends NettyContext> initializeServer() throws InterruptedException { 
    this.log.debug("Server Initializing"); 
    BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> serverHandler = (in, 
      out) -> { 
     in.receive().asString().subscribe(data -> { 
      this.log.debug("Received " + data + " on " + in); 
     }); 
     return Flux.never(); 
    }; 

    TcpServer server = TcpServer.create(opts -> opts.afterChannelInit(pipeline).listen(tcpSocketAddress)); 
    return server.newHandler(serverHandler); 
} 

を考え出しました。

その後、私は、サーバーに

private void startServer(Mono<? extends NettyContext> connected) { 
     ChannelFuture f = connected.block(Duration.ofSeconds(5)).channel() 
       .closeFuture(); 
     final CountDownLatch channelLatch = new CountDownLatch(1); 
     f.addListener(new ChannelFutureListener() { 

      @Override 
      public void operationComplete(ChannelFuture cf) throws Exception { 
       log.debug("Channel Disconnected"); 

      } 

     }); 
     f.awaitUninterruptibly(); 
     // Now we are sure the future is completed. 
     assert f.isDone(); 

     if (f.isCancelled()) { 
      this.log.warn("Connection Cancelled"); 
     } else if (!f.isSuccess()) { 
      if (f.cause() != null) { 
       f.cause().printStackTrace(); 
      } else { 
       this.log.warn("Connection not successful"); 
      } 
     } else { 
      channelLatch.countDown(); 
      this.log.info("Server Start Successful"); 
     } 
     try { 
      channelLatch.await(); 
     } catch (InterruptedException ex) { 
      throw new CancellationException("Interrupted while waiting for streaming " + "connection to arrive."); 
     } 
} 
を開始
関連する問題