私は春の統合アプリケーションがあり、すべてのデータが処理された後に閉じる必要があります。 appContext.close()
を明示的に呼び出すと、すべてのデータが時間内に処理されるわけではありません(Thread.sleep()
を設定しない限り)。アプリケーションコンテキストでcloseを呼び出さないと、アプリケーションが自動的に閉じることができないバックグラウンドポーラーがあるので、アプリケーションは停止しません。どのように私のサービスアクティベータ(最後に処理のチェーン)のいずれかでアプリケーション全体を停止するように通知するには?アプリケーションからのポーリングと終了を停止するメッセージの送信方法
- 最初のBeanは線でストレージラインからデータを読み取り、すべてのスレッドより
- 並行して起こっているループ
- 処理チェーンは、ポーリング可能キュー
- を介して単一のスレッドにメッセージを送信している間に
gateway.send(data)
を介して送りますそして、ここでは、最初のBeanで読み込まれたすべてのメッセージが処理されたことを認識すると、アプリケーションを停止する必要があります。
コントロールサービスを使用して最後のサービスアクティベータを停止しようとしましたが、それは助けにはならなかった
おかげ
UPDATEここ
は、いくつかのコードの例です:
ランナー:
public class Runner {
static Logger log = LoggerFactory.getLogger(Service2.class);
public static void main(String[] args) throws InterruptedException {
log.info("START APP");
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
RootService service = context.getBean(RootService.class);
service.start();
service.stop();
context.close();
log.info("END APP");
}
}
RootService:
@Component
public class RootService {
Logger log = LoggerFactory.getLogger(RootService.class);
@Autowired
MyGateway gateway;
int totalSize = 0;
public void start() {
List<String> source = generateSource();
totalSize = source.size();
//imitate very long but finite process
for (String s : source) {
gateway.send(s, totalSize);
}
log.info("end sending data");
}
public void stop() throws InterruptedException {
log.info("sending stop signal...");
while (gateway.sendStop(totalSize)<0) {
Thread.sleep(100);
log.info("sending stop signal...");
}
log.info("THE END");
}
private List<String> generateSource() {
List<String> result = new ArrayList<String>();
for (int i = 0; i < 15; i++) {
result.add("data" + i);
}
return result;
}
}
サービス1
@Component
public class Service1 {
public String dodo(String data) throws InterruptedException {
//doing a job in parallel
Thread.sleep(100);
return data + "-" + Thread.currentThread().getName();
}
}
サービス2:
@Component
public class Service2 {
Logger log = LoggerFactory.getLogger(Service2.class);
int counter = 0;
public void dodo(String data) {
log.info("data: {}-{}", data, Thread.currentThread().getName());
counter++;
log.info("counter: {}", counter);
}
public Integer dodo(Integer data) {
if (counter < data) {
return -1;
} else {
return 0;
}
}
}
@Component
public class ErrorHandler {
Logger log = LoggerFactory.getLogger(Service2.class);
public void handleError(Message<?> message) {
log.info("ERROR: {}", message);
}
}
とxml設定
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task">
<gateway id="myGateway"
service-interface="com.dimas.MyGateway"
default-request-channel="channel1"
error-channel="errorChannel"
default-reply-timeout="3000">
<method name="send" request-channel="channel1"/>
<method name="sendStop" request-channel="channel2" reply-channel="channel3"/>
</gateway>
<channel id="channel1">
<dispatcher task-executor="executor"/>
</channel>
<channel id="channel2">
<queue/>
</channel>
<channel id="channel3">
<queue/>
</channel>
<channel id="errorChannel"/>
<service-activator input-channel="errorChannel" ref="errorHandler" method="handleError"/>
<service-activator input-channel="channel1" output-channel="channel2" ref="service1"/>
<service-activator input-channel="channel2" output-channel="channel3" ref="service2">
<poller fixed-delay="0"/>
</service-activator>
<task:executor id="executor" pool-size="2"/>
</beans:beans>
今では動作します - すべてのデータが処理されるときにアプリが停止され、それがストップコード0を返します。しかし、私はログのような多くのエラーのように表示されます:
にErrorMessage [ペイロード= org.springframework.messaging.core.DestinationResolutionException: は出力チャネル、または利用可能なreplyChannelヘッダ、 ヘッダー= {ID = 639ca939-8110-4486-6a2b-5d36c7bfdbcd、 タイムスタンプ= 1475872251269}いません]
エラーハンドラが正常にそれをキャッチが、問題でしたどこが実現
何かが間違っています。最後のService2は、任意の収入メッセージのためにsoemthingを返します。 stopRequest要求に対してのみコードに応答するようにRedidします。
だけで簡単なソリューション
私はソースからのすべてのレコードをポーリングしますが、私はまだそれらの全てまで待機する必要があることを検出することができます処理されます。最後のメッセージを処理した場合、最後のサービスアクティベータにpingを実行した場合、ゲートウェイに別のコールを追加しました。しかし、私は多くの未知の出力や返信チャンネルを持っています –
申し訳ありません - あなたの状況を理解するのは難しいです。いくつかの設定例とその問題点を正確にご記入ください。 –
コード例を追加 –