2017-03-07 7 views
0

Nettyプロキシから2台のサーバーにトラフィックを複製する方法を模索しています。 すなわち代わりに一般的な実装の:トラフィックを複製するNettyプロキシ

Server1に - >プロキシ - >サーバー私は次の操作を行いたい2

サーバー1 - >プロキシ - >サーバー2およびサーバ3

サーバ3->プロキシが破棄されました

したがって、各メッセージはサーバ2とサーバ3の両方に送信されます。

サーバ3のためにプロキシとサーバ2間の通信をブロックしてはならないという制約が1つあります(サーバ3が遅い場合など) 。

私は、次のコードから始めています:https://github.com/dawnbreaks/TcpProxy

は残念ながら、私はネッティーとあまり慣れていないけど、実装が私の目的のために非常に最適と思われます。私は理解したいと思います:サーバー3

  • への通信のために無効にするAPI
  • サーバ3のための新しいチャネルを作成する方法

    1. 読み、サーバーからメッセージを削除する方法を3
  • 答えて

    3

    はIRC#nettyであなたのチャットを見ました。

    ここにいくつかのものがあります。あなたのプロキシは、サーバ1が接続するサーバ側を持つ必要があります。次に、サーバー2とサーバー3は、プロキシからの接続を除くか、またはUDP(依存)を使用してプロキシからデータを受信する必要があります。

    Nettyにはプロキシサーバーの例があります。これはあなたのケースではうまくいくでしょうし、3番目のパートにとっては本当に簡単です。既存のサンプルを使用して、サーバー3となる新しい接続を開きます。次に、プロキシから両方のチャネルを取得します(サーバー2と3のクライアント接続)。チャネルグループに入れ、 2台のサーバーに!私のコード例では、サーバー1からサーバー2への通信を相互に許可し、サーバー3のみがデータを受信できる一方、サーバー3がプロキシに応答すると、プロキシは何もしません。バッファを解放するためのハンドラを追加したり、サーバ3から来てはならないデータを処理したりすることもできます。また、ここから開始する必要がありますが、nettyドキュメント、api、examples、pptなどがあります。

    あなたにはいくつかの変更されたコードが添付されます。ここには例へのリンクがあります。

    Netty Proxy Server Examples

    あなたはHexDumpProxyFrontendHandler.classを編集して、ちょうどサーバ3のための新しいクライアントのための第二のブートストラップを追加する例についてだから。

    現在のコード

    41  @Override 
    42  public void channelActive(ChannelHandlerContext ctx) { 
    43   final Channel inboundChannel = ctx.channel(); 
    44 
    45   // Start the connection attempt. 
    46   Bootstrap b = new Bootstrap(); 
    47   b.group(inboundChannel.eventLoop()) 
    48   .channel(ctx.channel().getClass()) 
    49   .handler(new HexDumpProxyBackendHandler(inboundChannel)) 
    50   .option(ChannelOption.AUTO_READ, false); 
    51   ChannelFuture f = b.connect(remoteHost, remotePort); 
    52   outboundChannel = f.channel(); 
    53   f.addListener(new ChannelFutureListener() { 
    54    @Override 
    55    public void operationComplete(ChannelFuture future) { 
    56     if (future.isSuccess()) { 
    57      // connection complete start to read first data 
    58      inboundChannel.read(); 
    59     } else { 
    60      // Close the connection if the connection attempt has failed. 
    61      inboundChannel.close(); 
    62     } 
    63    } 
    64   }); 
    65  } 
    

    編集コード

    import io.netty.bootstrap.Bootstrap; 
    import io.netty.channel.Channel; 
    import io.netty.channel.ChannelFuture; 
    import io.netty.channel.ChannelFutureListener; 
    import io.netty.channel.ChannelHandlerContext; 
    import io.netty.channel.ChannelOption; 
    
    /* 
    * Copyright 2012 The Netty Project 
    * 
    * The Netty Project licenses this file to you under the Apache License, 
    * version 2.0 (the "License"); you may not use this file except in compliance 
    * with the License. You may obtain a copy of the License at: 
    * 
    * http://www.apache.org/licenses/LICENSE-2.0 
    * 
    * Unless required by applicable law or agreed to in writing, software 
    * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
    * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
    * License for the specific language governing permissions and limitations 
    * under the License. 
    */ 
    package io.netty.example.proxy; 
    
    import io.netty.buffer.Unpooled; 
    import io.netty.channel.ChannelInboundHandlerAdapter; 
    import io.netty.channel.group.ChannelGroup; 
    import io.netty.channel.group.DefaultChannelGroup; 
    import io.netty.util.concurrent.GlobalEventExecutor; 
    
    public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter { 
    
        private final String remoteHost; 
        private final int remotePort; 
    
        // As we use inboundChannel.eventLoop() when buildling the Bootstrap this does not need to be volatile as 
        // the server2OutboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel. 
        private Channel server2OutboundChannel; 
        private Channel server3OutboundChannel; 
    
        // TODO You should change this to your own executor 
        private ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 
    
        public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) { 
         this.remoteHost = remoteHost; 
         this.remotePort = remotePort; 
        } 
    
        @Override 
        public void channelActive(ChannelHandlerContext ctx) { 
         final Channel inboundChannel = ctx.channel(); 
    
         // Start the connection attempt to SERVER 3 
         Bootstrap server3Bootstrap = new Bootstrap(); 
         server3Bootstrap.group(inboundChannel.eventLoop()) 
           .channel(ctx.channel().getClass()) 
           // You are only writing traffic to server 3 so you do not need to have a handler for the inbound traffic 
           .handler(new DiscardServerHandler()) // EDIT 
           .option(ChannelOption.AUTO_READ, false); 
         ChannelFuture server3Future = server3Bootstrap.connect(remoteHost, remotePort); 
         server3OutboundChannel = server3Future.channel(); 
    
    
         // Start the connection attempt to SERVER 2 
         Bootstrap server2Bootstrap = new Bootstrap(); 
         server2Bootstrap.group(inboundChannel.eventLoop()) 
           .channel(ctx.channel().getClass()) 
           .handler(new HexDumpProxyBackendHandler(inboundChannel)) 
           .option(ChannelOption.AUTO_READ, false); 
         ChannelFuture server2Future = server2Bootstrap.connect(remoteHost, remotePort); 
         server2OutboundChannel = server2Future.channel(); 
         server2Future.addListener(new ChannelFutureListener() { 
          @Override 
          public void operationComplete(ChannelFuture future) { 
           if (future.isSuccess()) { 
            // connection complete start to read first data 
            inboundChannel.read(); 
           } else { 
            // Close the connection if the connection attempt has failed. 
            inboundChannel.close(); 
           } 
          } 
         }); 
    
         // Here we are going to add channels to channel group to save bytebuf work 
         channels.add(server2OutboundChannel); 
         channels.add(server3OutboundChannel); 
        } 
    
        // You can keep this the same below or use the commented out section 
        @Override 
        public void channelRead(final ChannelHandlerContext ctx, Object msg) { 
         // You need to reference count the message +1 
         msg.retain(); 
         if (server2OutboundChannel.isActive()) { 
          server2OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { 
           @Override 
           public void operationComplete(ChannelFuture future) { 
            if (future.isSuccess()) { 
             // was able to flush out data, start to read the next chunk 
             ctx.channel().read(); 
            } else { 
             future.channel().close(); 
            } 
           } 
          }); 
         } 
         if (server3OutboundChannel.isActive()) { 
          server3OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { 
           @Override 
           public void operationComplete(ChannelFuture future) { 
            if (future.isSuccess()) { 
             // was able to flush out data, start to read the next chunk 
             ctx.channel().read(); 
            } else { 
             future.channel().close(); 
            } 
           } 
          }); 
         } 
    
    
         // Optional to the above code instead channel writing automatically cares for reference counting for you 
    //  channels.writeAndFlush(msg).addListeners(new ChannelFutureListener() { 
    // 
    //   @Override 
    //   public void operationComplete(ChannelFuture future) throws Exception { 
    //    if (future.isSuccess()) { 
    //     // was able to flush out data, start to read the next chunk 
    //     ctx.channel().read(); 
    //    } else { 
    //     future.channel().close(); 
    //    } 
    //   } 
    //  }); 
        } 
    
        @Override 
        public void channelInactive(ChannelHandlerContext ctx) { 
         if (server2OutboundChannel != null) { 
          closeOnFlush(server2OutboundChannel); 
         } 
         if (server3OutboundChannel != null) { 
          closeOnFlush(server3OutboundChannel); 
         } 
    
    
         // Optionally can do this 
    //  channels.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); 
        } 
    
        @Override 
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
         cause.printStackTrace(); 
         closeOnFlush(ctx.channel()); 
        } 
    
        /** 
        * Closes the specified channel after all queued write requests are flushed. 
        */ 
        static void closeOnFlush(Channel ch) { 
         if (ch.isActive()) { 
          ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); 
         } 
        } 
    } 
    

    破棄ハンドラ

    これは、サーバ3によってプロキシに書き込まれたものをすべて破棄するためのハンドラとしてサーバ3に追加できます。デフォルトでは、SimpleInboundHandlersは、参照カウントを減らして処理したメッセージを破棄します。

    Discard Handler Code

    +0

    あなたの応答のために、私はこの上で行くとあなたに戻って取得するので、多くのおかげで... – tsar2512

    +0

    これは自動的にサーバ3がブロックされ、またはスローダウンを持っている場合、それは影響を与えないことを保証んまたはサーバー1 - >サーバー2間の通信を減速させますか? – tsar2512

    +0

    サーバー2に加えて、ソケット3からサーバー3にデータが残っている唯一の追加負荷があります。チャネルグループを使用して内部的に処理しても、パフォーマンスに大きな影響はありません。サーバー3は技術的にプロキシに書き込むことができますが、データは処理されません。破棄ハンドラを挿入することができます。プロキシへの接続用の破棄ハンドラをサーバー3に含めるための回答を編集しました。サーバー3からのすべてのデータは、プロキシに書き込むと破棄されます。 – Underbalanced

    関連する問題