2012-03-19 9 views
1

全身、こんにちは!netty:なぜハンドラのコードでfuture.await()を使用できないのですか?

netty 3.1を使用してソケットデータを別のソケットサーバーに転送するソケットディスパッチサーバーを構築するため、次のmessageRecvイベントが到着したときに最初のメッセージが到着したときにnettyサーバーハンドラでクライアント接続を作成し、 、私はバッファをサーバーチャネルからクライアントチャネルに転送するだけです。しかしfuture.await *()操作を使用するときはハンドラで禁止されています。 await()を使わないと、connectFutureが同期しているので、次のメッセージが到着したときにconenctが完了していない可能性があります。私はこの問題をどう対処するかを知らない。

次のmessageRecvイベントが到着する前にクライアントの接続が完了していることを確認するにはどうすればよいですか?今

、私はちょうどこのように、2つのコードを同期するためのロックを行います。

/** 
* server handler 
*/ 
public class ServerChannelHandler extends SimpleChannelUpstreamHandler { 

    private static Logger _logger = LoggerFactory.getLogger(cn.szboc.dispatch.server.netty.ServerChannelHandler.class); 

    public ServerChannelHandler(ProxyClientFactory clientFactory) { 
    this.clientFactory = clientFactory; 
    } 

/** factory connect another server */ 
private ProxyClientFactory clientFactory; 

/** anotherchannel */ 
private Channel innerChannel; 

private ChannelFuture connectFuture; 


private ReentrantLock connectLock = new ReentrantLock(); 

private Condition notComplete = connectLock.newCondition(); 

@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 

    final ChannelBuffer buffer = ((ChannelBuffer) e.getMessage()).copy(); 
    final Channel outChannel = ctx.getChannel(); 

    // first connect 
    if (connectFuture == null) { 

     final ClientChannelHandler cch = new ClientChannelHandler(ctx.getChannel()); 

     ProxyClient client = clientFactory.retrieveClient(); 


     connectFuture = client.getConnectChannelFuture(); 


     connectFuture.addListener(new ChannelFutureListener() { 

      @Override 
      public void operationComplete(ChannelFuture future) throws Exception { 

       connectLock.lock(); 
       try { 

        if (future.isSuccess()) { 
         innerChannel = future.getChannel(); 

         innerChannel.getPipeline().addLast("clientchannelhandler", cch); 
         innerChannel.write(buffer); 
        } else { 

         Channels.fireExceptionCaught(outChannel, future.getCause()); 
        } 
       } finally { 

        notComplete.signal(); 

        connectLock.unlock(); 
       } 
      } 

     }); 

    } else { 

     connectLock.lock(); 
     try { 

      if (!connectFuture.isDone()) { 

       if (!notComplete.await(500, TimeUnit.MILLISECONDS)) { 

        throw new Exception(""); 
       } 
      } 


      if (connectFuture.isSuccess()) { 
       if(innerChannel == null){ 
        if (!notComplete.await(500, TimeUnit.MILLISECONDS)) { 

         throw new Exception(""); 
        } 
       } 
       innerChannel.write(buffer); 
      } else { 

       _logger.error(""); 
      } 

     } finally { 
      connectLock.unlock(); 
     } 

    } 

} 

答えて

1

あなたが網状にデッドロックができない可能性があるため。あなたはまた、悪いことであるIO-Workerスレッドをブロックします。あなたの状況を処理する最良の方法は、接続が完了してからディスパッチするまでメッセージを「キューに入れる」ことです。

「ChannelOpen(..)」メソッドを使用してプロキシクライアントに接続するには、Channel.setReadable(false)を前に設定します。接続が完了したら、Channel.setReadable(true)を再度呼び出して、messageEventsを処理させます。

このような何か:

@Override 
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) 
     throws Exception { 
    // Suspend incoming traffic until connected to the remote host. 
    final Channel inboundChannel = e.getChannel(); 
    inboundChannel.setReadable(false); 

    // Start the connection attempt. 
    ClientBootstrap cb = new ClientBootstrap(cf); 
    cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel())); 
    ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort)); 

    outboundChannel = f.getChannel(); 
    f.addListener(new ChannelFutureListener() { 
     @Override 
     public void operationComplete(ChannelFuture future) throws Exception { 
      if (future.isSuccess()) { 
       // Connection attempt succeeded: 
       // Begin to accept incoming traffic. 
       inboundChannel.setReadable(true); 
      } else { 
       // Close the connection if the connection attempt has failed. 
       inboundChannel.close(); 
      } 
     } 
    }); 
} 

は、より詳細[1]のプロキシ例を参照してください。

[1] https://github.com/netty/netty/tree/3.2/src/main/java/org/jboss/netty/example/proxy

+0

はどうもありがとうございました、私はあなたの第二の溶液を選択しますが、機能inboundChannel.setReadable(false)を見つけます。非同期であり、返されたchannelFuture beanは、それ自体に追加されたlistnerを呼び出さないようです。 – xinglu

+0

私はnetty 3.1コード、クラスorg.jboss.netty.channel.socket.nio.NioWorker行659行に "void setInterestOps( NioSocketChannel channel、ChannelFuture future、int interestOps)関数でステップします"キー== nullまたはセレクターがnullの場合、関数は単に戻り値であり、将来の状態は設定されません。真実を伝えるために、私はもうnettyコアコードを捕まえません。しかし、私は "inboundChannel.setReadable(false)"の設定操作は単に関数の呼び出しではなく、非同期ではないと思います。私はあなたからの2番目のソリューションを使用することが安全だと思う、ありがとう! – xinglu

+0

申し訳ありませんあなたの問題を理解できません..詳細を教えてください。 –

関連する問題