ありがとうございます!私は、多くのクライアントをサーバーに接続し、クライアントとサーバー間の非同期メッセージを容易にすることを目的とした、シンプルなTCP統合に取り組んでいます。Springインテグレーション - スレッドアウトバウンドチャンネル
1つの接続がストールすると(切断されずにすべてのパケットが破棄されます)、他のすべての接続が回復するまでメッセージの受信を停止します。クライアントとの通信が他のクライアントとの接続不安から独立するようにするには、実装または変更する必要がありますか?次のように
サーバーの設定は次のとおりです。
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
import org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@EnableIntegration
@Configuration
public class IntegrationConfig {
private static final Logger log = Logger.getLogger(IntegrationConfig.class);
@Value("${listen.port:8000}")
private int port;
@Autowired
Relayer relayer;
@Bean //for accepting text message from TCP, putty
public MessageChannel fromTcp() {
return new ExecutorChannel(threadPoolTaskExecutor());
}
@Bean //inbound, it is working, I could read the inbound message while debugging
public TcpReceivingChannelAdapter in(
AbstractServerConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setOutputChannel(fromTcp());
adapter.setConnectionFactory(connectionFactory);
return adapter;
}
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(100);
return taskScheduler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setCorePoolSize(100);
return threadPoolTaskExecutor;
}
@Bean
public MessageChannel toTcp() {
ExecutorChannel directChannel = new ExecutorChannel(threadPoolTaskExecutor());
relayer.setOutboundChannel(directChannel);
return directChannel;
}
@ServiceActivator(inputChannel = "toTcp")
@Bean
public TcpSendingMessageHandler out(
AbstractServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler tcpOutboundAdp = new TcpSendingMessageHandler();
tcpOutboundAdp.setConnectionFactory(connectionFactory);
return tcpOutboundAdp;
}
@ServiceActivator(inputChannel = "fromTcp")
public void handleIncompingMessage(Message<byte[]> stringMsg) {
String new_message = new String(stringMsg.getPayload());
new_message = new_message.replaceAll("\r", "");
new_message = new_message.replaceAll("\n", "");
relayer.processIncomingMessage((String) stringMsg.getHeaders().get("ip_connectionId"), new_message);
}
@Bean
public AbstractServerConnectionFactory serverCF() {
TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(this.port);
tcpNetServerConnectionFactory.setSingleUse(false);
tcpNetServerConnectionFactory.setSerializer(new ByteArrayCrLfSerializer());
tcpNetServerConnectionFactory.setDeserializer(new ByteArrayLfSerializer());
return tcpNetServerConnectionFactory;
}
@EventListener
public void onApplicationEvent(TcpConnectionOpenEvent event) {
relayer.newConnection(event.getConnectionId());
}
@EventListener
public void onApplicationEvent(TcpConnectionCloseEvent event) {
relayer.deleteConnection(event.getConnectionId());
}
}
これは、多くの例をもとに、再度され、その1つのクライアントが出て遅れている場合を除き、すべての作品。私は、私のアプリケーションの他の部分を出力チャンネルと話していますが、途中で何らかのアグリゲーターを持っていればいいですか?
乾杯!
私は完全には理解していないと確信していますあなたが描いている問題は何ですか?しかし、1つの接続が他の接続に影響を及ぼすようなフレームワークでは何も考えられません。おそらく何が起こっているかを見るためにスレッドダンプを取るでしょうか?おそらく、あなたの 'Relayer'コードを追加しますか?デバッグロギングも役に立ちます。 –
こんにちはゲイリー、ここで超大ファン! :) relayerが行っているのは、別のアプリケーションに送信された情報を送信することです。そのアプリケーションからのデータが返され、送信チャネルを通じて送信されるという点があります。接続が維持されている限り、すべてが完全に機能します。私はしばらく困惑しており、これは正常ではないと私は同意する。私の考えは、エグゼキュータプールがソケットを探して詰まってしまって、他のクライアントに配信するエグゼキュータがもうないということです。私はスレッドダンプを実行し、それが役立つ場合は投稿します。ありがとう!! – Jwebster
@GaryRussellここでは、クライアントがメッセージを受信していないラグ中のスレッドダンプです。http://textuploader.com/d4iwr – Jwebster