2017-10-24 17 views
0

私は、バネ統合TCPを使用して構築されたTCPクライアントを持っており、サーバはキープアライブメッセージ(ping/pongスタイル)をサポートしています。接続はCachingClientConnectionFactoryを使用して設定されています。このサーバー機能を利用したいと思います。バネ統合を使用してTCP接続でキープアライブ接続を実装する方法は?

private static final int SERIALIZER_HEADER_SIZE = 2; 

/** 
* Serializer used by connection factory to send and receive messages 
*/ 
@Bean 
public ByteArrayLengthHeaderSerializer byteArrayLengthHeaderSerializer() { 
    return new ByteArrayLengthHeaderSerializer(SERIALIZER_HEADER_SIZE); 
} 

@Bean 
public AbstractClientConnectionFactory tcpClientConnectionFactory() { 
    TcpNetClientConnectionFactory connFactory = 
     new TcpNetClientConnectionFactory(props.getUrl(), props.getPort()); 
    connFactory.setSerializer(byteArrayLengthHeaderSerializer()); 
    connFactory.setDeserializer(byteArrayLengthHeaderSerializer()); 
    connFactory.setSoTimeout(props.getSoTimeout()); 
    if (props.isUseSSL()) { 
     connFactory.setTcpSocketFactorySupport(new DefaultTcpNetSSLSocketFactorySupport(() -> { 
      return SSLContext.getDefault(); 
     })); 
    } 

    return connFactory; 
} 

/** 
* Connection factory used to create TCP client socket connections 
*/ 
@Bean 
public AbstractClientConnectionFactory tcpCachedClientConnectionFactory() { 
    CachingClientConnectionFactory cachingConnFactory = 
     new CachingClientConnectionFactory(tcpClientConnectionFactory(), props.getMaxPoolSize()); 
    cachingConnFactory.setConnectionWaitTimeout(props.getMaxPoolWait()); 
    return cachingConnFactory; 
} 

接続がかどうかを確認するために、私は、接続が開か保つことができますが、私はまた、これらのサーバー上のレバレッジを取るアライブメッセージを維持し、随時、それらのメッセージを送りたいと思ったここConfigure keep alive to keep connection alive all the timeを掲載ソリューションを使用する:ここに私のBean構成ですまだ生きています。これにより、ソケットが閉じられた場合に新しい接続を再接続/作成する必要がないため、クライアント側のパフォーマンスが向上します。

これに基づいて、誰かがスプリング統合を使用してこれを実装する方法について提案していますか?

+1

あなたの意図が正確には分かりません。 'soKeepAlive'をtrueに設定すると、オペレーティングシステムはpingを送信してソケットを開いたままにします。 'soTimeout'を設定しないと、ソケットは無期限に開いたままになります。 –

+0

サーバは 'KEEP_ALIVE_REQUEST'のようなものを期待し、' KEEP_ALIVE_RESPONSE'を返します。私の質問は、接続を開いたままにするためにそれを使用することでしたが、あなたの応答 'soKeepAlive'と' soTimeout'に基づいて、そのトリックを行うことができます。 –

+0

アップデート:@GaryRussellサーバーは30秒間使用しないとソケットを閉じます。それに基づいて、春の統合には、クライアントソケットを再利用できるように、特定のキープアライブメッセージをバックグラウンドで送信するために使用できる機能がありますか? –

答えて

1

単純なクライアント接続ファクトリを使用する場合は、アプリケーションレベルのハートビートメッセージを@InboundChannelAdapterに設定するだけで十分です。

簡単な例:あなたが開いているアイドル状態の接続のプールを維持したいと思う理由

@SpringBootApplication 
public class So46918267Application { 

    public static void main(String[] args) throws IOException { 
     // Simulated Server 
     final ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(1234); 
     ExecutorService executor = Executors.newSingleThreadExecutor(); 
     executor.execute(() -> { 
      try { 
       Socket socket = server.accept(); 
       BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); 
       String line; 
       while ((line = reader.readLine()) != null) { 
        System.out.println(line); 
        if (line.equals("keep_alive")) { 
         socket.getOutputStream().write("OK\r\n".getBytes()); 
        } 
       } 
      } 
      catch (IOException e) { 
       e.printStackTrace(); 
      } 
     }); 
     ConfigurableApplicationContext context = SpringApplication.run(So46918267Application.class, args); 
     System.out.println("Hit enter to terminate"); 
     System.in.read(); 
     executor.shutdownNow(); 
     context.close(); 
     server.close(); 
    } 

    @Bean 
    public TcpNetClientConnectionFactory client() { 
     return new TcpNetClientConnectionFactory("localhost", 1234); 
    } 

    @ServiceActivator(inputChannel = "toTcp") 
    @Bean 
    public TcpOutboundGateway gateway() { 
     TcpOutboundGateway gateway = new TcpOutboundGateway(); 
     gateway.setConnectionFactory(client()); 
     return gateway; 
    } 

    // HEARTBEATS 

    private final Message<?> heartbeatMessage = MessageBuilder.withPayload("keep_alive") 
      .setReplyChannelName("heartbeatReplies") 
      .build(); 

    @InboundChannelAdapter(channel = "toTcp", poller = @Poller(fixedDelay = "25000")) 
    public Message<?> heartbeat() { 
     return this.heartbeatMessage; 
    } 

    @ServiceActivator(inputChannel = "heartbeatReplies") 
    public void reply(byte[] reply) { 
     System.out.println(new String(reply)); 
    } 

} 

CachingClientConnectionFactoryを使用して、しかし、それは明らかではありません。ただし、プールの動作方法は、アイドル状態の接続がキューに保持され、各要求が最も古い接続に移動し、接続がキューの末尾に戻されるためです。

@InboundChannelAdapter(channel = "toTcp", 
    poller = @Poller(fixedDelay = "25000", maxMessagesPerPoll = "5")) 

が開い5つの接続まで続けるだろう...各ポーリングでメッセージの数を放出してしまう maxMessagesPerPollを追加します。新しい接続が開かない(少なくとも1つがあれば)が、プールに5つ以上の接続が含まれている場合、少なくとも5つは開いたままになります。開いている接続がない場合は、1つだけ開きます。

+0

あなたの迅速な対応に心から感謝しています。それはまさに私が探していたものでした。クライアントが新しい接続を開かなければならないたびに、プール内にいくつかの接続を開いたままにしておきたい時があります。 –

関連する問題