2017-10-24 25 views
1

ありがとうございます!私は、多くのクライアントをサーバーに接続し、クライアントとサーバー間の非同期メッセージを容易にすることを目的とした、シンプルなTCP統合に取り組んでいます。Springインテグレーション - スレッドアウトバウンドチャンネル

1つの接続がストールすると(切断されずにすべてのパケットが破棄されます)、他のすべての接続が回復するまでメッセージの受信を停止します。クライアントとの通信が他のクライアントとの接続不安から独立するようにするには、実装または変更する必要がありますか?次のように

サーバーの設定は次のとおりです。

import org.apache.log4j.Logger; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.event.EventListener; 
import org.springframework.integration.annotation.ServiceActivator; 
import org.springframework.integration.channel.ExecutorChannel; 
import org.springframework.integration.config.EnableIntegration; 
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; 
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; 
import org.springframework.integration.ip.tcp.connection.*; 
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer; 
import org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer; 
import org.springframework.messaging.Message; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.MessageHeaders; 
import org.springframework.scheduling.TaskScheduler; 

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; 



@EnableIntegration 
@Configuration 
public class IntegrationConfig { 
    private static final Logger log = Logger.getLogger(IntegrationConfig.class); 

    @Value("${listen.port:8000}") 
    private int port; 

    @Autowired 
    Relayer relayer; 

    @Bean //for accepting text message from TCP, putty 
    public MessageChannel fromTcp() { 
     return new ExecutorChannel(threadPoolTaskExecutor()); 
    } 

    @Bean //inbound, it is working, I could read the inbound message while debugging 
    public TcpReceivingChannelAdapter in(
      AbstractServerConnectionFactory connectionFactory) { 
     TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); 
     adapter.setOutputChannel(fromTcp()); 
     adapter.setConnectionFactory(connectionFactory); 

     return adapter; 
    } 
    @Bean 
    public TaskScheduler taskScheduler() { 
     ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); 
     taskScheduler.setPoolSize(100); 
     return taskScheduler; 
    } 

    @Bean 
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() { 
     ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); 
     threadPoolTaskExecutor.setMaxPoolSize(100); 
     threadPoolTaskExecutor.setCorePoolSize(100); 
     return threadPoolTaskExecutor; 
    } 

    @Bean 
    public MessageChannel toTcp() { 
     ExecutorChannel directChannel = new ExecutorChannel(threadPoolTaskExecutor()); 
     relayer.setOutboundChannel(directChannel); 
     return directChannel; 
    } 

    @ServiceActivator(inputChannel = "toTcp") 
    @Bean 
    public TcpSendingMessageHandler out(
      AbstractServerConnectionFactory connectionFactory) { 
     TcpSendingMessageHandler tcpOutboundAdp = new TcpSendingMessageHandler(); 
     tcpOutboundAdp.setConnectionFactory(connectionFactory); 
     return tcpOutboundAdp; 
    } 

    @ServiceActivator(inputChannel = "fromTcp") 
    public void handleIncompingMessage(Message<byte[]> stringMsg) { 
     String new_message = new String(stringMsg.getPayload()); 
     new_message = new_message.replaceAll("\r", ""); 
     new_message = new_message.replaceAll("\n", ""); 
     relayer.processIncomingMessage((String) stringMsg.getHeaders().get("ip_connectionId"), new_message); 
    } 

    @Bean 
    public AbstractServerConnectionFactory serverCF() { 
     TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(this.port); 
     tcpNetServerConnectionFactory.setSingleUse(false); 
     tcpNetServerConnectionFactory.setSerializer(new ByteArrayCrLfSerializer()); 
     tcpNetServerConnectionFactory.setDeserializer(new ByteArrayLfSerializer()); 
     return tcpNetServerConnectionFactory; 
    } 
    @EventListener 
    public void onApplicationEvent(TcpConnectionOpenEvent event) { 
     relayer.newConnection(event.getConnectionId()); 
    } 

    @EventListener 
    public void onApplicationEvent(TcpConnectionCloseEvent event) { 
     relayer.deleteConnection(event.getConnectionId()); 
    } 
} 

これは、多くの例をもとに、再度され、その1つのクライアントが出て遅れている場合を除き、すべての作品。私は、私のアプリケーションの他の部分を出力チャンネルと話していますが、途中で何らかのアグリゲーターを持っていればいいですか?

乾杯!

+0

私は完全には理解していないと確信していますあなたが描いている問題は何ですか?しかし、1つの接続が他の接続に影響を及ぼすようなフレームワークでは何も考えられません。おそらく何が起こっているかを見るためにスレッドダンプを取るでしょうか?おそらく、あなたの 'Relayer'コードを追加しますか?デバッグロギングも役に立ちます。 –

+0

こんにちはゲイリー、ここで超大ファン! :) relayerが行っているのは、別のアプリケーションに送信された情報を送信することです。そのアプリケーションからのデータが返され、送信チャネルを通じて送信されるという点があります。接続が維持されている限り、すべてが完全に機能します。私はしばらく困惑しており、これは正常ではないと私は同意する。私の考えは、エグゼキュータプールがソケットを探して詰まってしまって、他のクライアントに配信するエグゼキュータがもうないということです。私はスレッドダンプを実行し、それが役立つ場合は投稿します。ありがとう!! – Jwebster

+0

@GaryRussellここでは、クライアントがメッセージを受信して​​いないラグ中のスレッドダンプです。http://textuploader.com/d4iwr – Jwebster

答えて

1

何かが足りないわけではありません。 threadPoolTaskExecutor-9は、ソケットにデータを送信しようとしてブロックされます。接続オブジェクトがロックされている...

- locked <0x0000000746902810> (a org.springframework.integration.ip.tcp.connection.TcpNetConnection) 

9他のスレッドが(send()が​​ある)そのロックを取得するために待機しているので、あなたが同時に同じソケットに複数のメッセージを書き込もうとしている表示されます。

他のソケットへの送信に問題はありません。そのソケットに送信する際に問題があります(おそらくバッファがいっぱいであるため)。

EDIT

ここではすべての送信を実行するための一つの方法は、ソケットごとに単一のスレッドに送るのです...

@SpringBootApplication 
public class So46917862Application { 

    private static final Logger LOGGER = LoggerFactory.getLogger(So46917862Application.class); 

    public static void main(String[] args) { 
     SpringApplication.run(So46917862Application.class, args).close(); 
    } 

    @Bean 
    public ApplicationRunner runner() { 
     return args -> { 
      Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234); 
      socket.getOutputStream().write("foo\r\n".getBytes()); 
      socket.getOutputStream().write("bar\r\n".getBytes()); 
      Thread.sleep(10_000); 
      socket.close(); 
     }; 
    } 

    @Bean 
    public TcpReceivingChannelAdapter adapter() { 
     TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); 
     adapter.setConnectionFactory(server()); 
     adapter.setOutputChannel(inbound()); 
     return adapter; 
    } 

    @Bean 
    public MessageChannel inbound() { 
     return new DirectChannel(); 
    } 

    @ServiceActivator(inputChannel = "inbound") 
    @Bean 
    public MessageHandler asyncResponder() { 
     SimpleAsyncTaskExecutor exec = new SimpleAsyncTaskExecutor(); 
     return m -> 
      exec.execute(() -> { 
       LOGGER.info("Initiating on this thread"); 
       toTcp().send(new GenericMessage<>("FOO", m.getHeaders())); 
      }); 
    } 

    @Bean 
    public AbstractServerConnectionFactory server() { 
     return new TcpNetServerConnectionFactory(1234); 
    } 

    @ServiceActivator(inputChannel = "outbound") 
    @Bean 
    public TcpSendingMessageHandler handler() { 
     TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); 
     handler.setConnectionFactory(server()); 
     return handler; 
    } 

    @Bean 
    public MessageChannel toTcp() { 
     return new DirectChannel(); 
    } 

    @Bean 
    public MessageChannel outbound() { 
     return new DirectChannel(); 
    } 

    @Bean 
    public SingleThreadPerConnection sender() { 
     return new SingleThreadPerConnection(outbound()); 
    } 

    public static class SingleThreadPerConnection implements ApplicationListener<TcpConnectionCloseEvent> { 

     private static final Logger LOGGER = LoggerFactory.getLogger(SingleThreadPerConnection.class); 

     private final Map<String, ThreadPoolTaskExecutor> executors = new HashMap<>(); 

     private final MessagingTemplate messagingTemplate; 

     public SingleThreadPerConnection(MessageChannel channel) { 
      this.messagingTemplate = new MessagingTemplate(channel); 
     } 

     @Override 
     public synchronized void onApplicationEvent(TcpConnectionCloseEvent event) { 
      this.executors.remove(event.getConnectionId()).shutdown(); 
      LOGGER.info("Removed executor for " + event.getConnectionId()); 
     } 

     @ServiceActivator(inputChannel = "toTcp") 
     public void sendToThread(final Message<?> message) { 
      executorFor((String) message.getHeaders().get(IpHeaders.CONNECTION_ID)) 
       .execute(() -> { 
        LOGGER.info("Sending on this thread"); 
        this.messagingTemplate.send(message); 
       }); 
     } 

     private synchronized TaskExecutor executorFor(String connectionId) { 
      Assert.state(connectionId != null, "No connection id header present"); 
      if (this.executors.get(connectionId) == null) { 
       ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); 
       exec.setThreadNamePrefix(connectionId + "-exec-"); 
       exec.initialize(); 
       this.executors.put(connectionId, exec); 
      } 
      return this.executors.get(connectionId); 
     } 

    } 

} 

結果:

2017-10-25 09:37:16.250 INFO 54983 --- [cTaskExecutor-1] com.example.So46917862Application   
: Initiating on this thread 
2017-10-25 09:37:16.250 INFO 54983 --- [cTaskExecutor-2] com.example.So46917862Application   
: Initiating on this thread 
2017-10-25 09:37:16.253 INFO 54983 --- [20b926b7-exec-1] 862Application$SingleThreadPerConnection 
: Sending on this thread 
2017-10-25 09:37:16.253 INFO 54983 --- [20b926b7-exec-1] 862Application$SingleThreadPerConnection 
: Sending on this thread 
+0

それは私にはちょっと聞こえます。だから、私は各ソケットにメッセージをキューに入れて、すべてのスレッドを詰まらせることがない、何かがありますか?または、私が使用するはずの別のチャネルアダプタがありますか?これを見てくれてありがとう! – Jwebster

+0

興味深い。私はそれにいくつかの考えを与える必要があります。多分、ある種の特殊なタスクエグゼキュータです。それはおそらく明日だろう。 –

+0

あなたは、開発者の大部分です。 – Jwebster

関連する問題