2016-10-07 8 views
2

私は春の統合アプリケーションがあり、すべてのデータが処理された後に閉じる必要があります。 appContext.close()を明示的に呼び出すと、すべてのデータが時間内に処理されるわけではありません(Thread.sleep()を設定しない限り)。アプリケーションコンテキストでcloseを呼び出さないと、アプリケーションが自動的に閉じることができないバックグラウンドポーラーがあるので、アプリケーションは停止しません。どのように私のサービスアクティベータ(最後に処理のチェーン)のいずれかでアプリケーション全体を停止するように通知するには?アプリケーションからのポーリングと終了を停止するメッセージの送信方法

  1. 最初のBeanは線でストレージラインからデータを読み取り、すべてのスレッドより
  2. 並行して起こっているループ
  3. 処理チェーンは、ポーリング可能キュー
  4. を介して単一のスレッドにメッセージを送信している間にgateway.send(data)を介して送りますそして、ここでは、最初のBeanで読み込まれたすべてのメッセージが処理されたことを認識すると、アプリケーションを停止する必要があります。

コントロールサービスを使用して最後のサービスアクティベータを停止しようとしましたが、それは助けにはならなかった

おかげ

UPDATEここ

は、いくつかのコードの例です:

ランナー:

public class Runner { 
static Logger log = LoggerFactory.getLogger(Service2.class); 

public static void main(String[] args) throws InterruptedException { 
    log.info("START APP"); 
    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); 
    RootService service = context.getBean(RootService.class); 
    service.start(); 
    service.stop(); 
    context.close(); 
    log.info("END APP"); 
} 

}

RootService:

@Component 
public class RootService { 

    Logger log = LoggerFactory.getLogger(RootService.class); 

    @Autowired 
    MyGateway gateway; 

    int totalSize = 0; 

    public void start() { 
     List<String> source = generateSource(); 
     totalSize = source.size(); 
     //imitate very long but finite process 
     for (String s : source) { 
      gateway.send(s, totalSize); 
     } 
     log.info("end sending data"); 
    } 

    public void stop() throws InterruptedException { 
     log.info("sending stop signal..."); 
     while (gateway.sendStop(totalSize)<0) { 
      Thread.sleep(100); 
      log.info("sending stop signal..."); 
     } 
     log.info("THE END"); 
    } 


    private List<String> generateSource() { 
     List<String> result = new ArrayList<String>(); 
     for (int i = 0; i < 15; i++) { 
      result.add("data" + i); 
     } 
     return result; 
    } 
} 

サービス1

@Component 
public class Service1 { 

    public String dodo(String data) throws InterruptedException { 
     //doing a job in parallel 
     Thread.sleep(100); 
     return data + "-" + Thread.currentThread().getName(); 
    } 
} 

サービス2:

@Component 
public class Service2 { 
    Logger log = LoggerFactory.getLogger(Service2.class); 
    int counter = 0; 

    public void dodo(String data) { 
     log.info("data: {}-{}", data, Thread.currentThread().getName()); 
     counter++; 
     log.info("counter: {}", counter); 
    } 

    public Integer dodo(Integer data) { 
     if (counter < data) { 
      return -1; 
     } else { 
      return 0; 
     } 
    } 
} 


@Component 
public class ErrorHandler { 

    Logger log = LoggerFactory.getLogger(Service2.class); 

    public void handleError(Message<?> message) { 
     log.info("ERROR: {}", message); 
    } 
} 

とxml設定

<?xml version="1.0" encoding="UTF-8"?> 
<beans:beans xmlns="http://www.springframework.org/schema/integration" 
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd 
     http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd" 
      xmlns:beans="http://www.springframework.org/schema/beans" 
      xmlns:task="http://www.springframework.org/schema/task"> 


    <gateway id="myGateway" 
      service-interface="com.dimas.MyGateway" 
      default-request-channel="channel1" 
      error-channel="errorChannel" 
      default-reply-timeout="3000"> 
     <method name="send" request-channel="channel1"/> 
     <method name="sendStop" request-channel="channel2" reply-channel="channel3"/> 
    </gateway> 

    <channel id="channel1"> 
     <dispatcher task-executor="executor"/> 
    </channel> 
    <channel id="channel2"> 
     <queue/> 
    </channel> 
    <channel id="channel3"> 
     <queue/> 
    </channel> 
    <channel id="errorChannel"/> 

    <service-activator input-channel="errorChannel" ref="errorHandler" method="handleError"/> 

    <service-activator input-channel="channel1" output-channel="channel2" ref="service1"/> 
    <service-activator input-channel="channel2" output-channel="channel3" ref="service2"> 
     <poller fixed-delay="0"/> 
    </service-activator> 

    <task:executor id="executor" pool-size="2"/> 
</beans:beans> 

今では動作します - すべてのデータが処理されるときにアプリが停止され、それがストップコード0を返します。しかし、私はログのような多くのエラーのように表示されます:

にErrorMessage [ペイロード= org.springframework.messaging.core.DestinationResolutionException: は出力チャネル、または利用可能なreplyChannelヘッダ、 ヘッダー= {ID = 639ca939-8110-4486-6a2b-5d36c7bfdbcd、 タイムスタンプ= 1475872251269}いません]

エラーハンドラが正常にそれをキャッチが、問題でしたどこが実現

何かが間違っています。最後のService2は、任意の収入メッセージのためにsoemthingを返します。 stopRequest要求に対してのみコードに応答するようにRedidします。

だけで簡単なソリューション

答えて

0

がある場合は、あなたが完了している検出できた場合、あなたはアプリケーション・コンテキストを閉じることができない理由、それははっきりしていない不思議。

デフォルトのtaskSchedulerをデーモンスレッドを使用するものに置き換えることができます。

EDITすべての

まず、あなたはすべてのそれらのキューチャネルを必要としません。単に実行チャネルであるchannel1を使用すると、並行性が得られます。キューチャネルを使用すると、オーバーヘッドが追加されます。

いずれの場合でも、プロセスが完了した時点を判断するには、getCounter()メソッドをService2に追加します。あなたのメインの方法では、コンテキストからservice2を取得し、カウンタが期待した数に増加するまで待ってください。

それとも、あなたはサービス2のラッチのカウントダウンを追加することができます - それを設定するためのメソッドを追加...

CountDownLatch latch = new CountDownLatch(source.size()); 
service2.setCountDownLatch(latch); 
for (

... 
if (!latch.await(...)) { // add a timeout in case is never completes) 
    // failure 
} 
+0

私はソースからのすべてのレコードをポーリングしますが、私はまだそれらの全てまで待機する必要があることを検出することができます処理されます。最後のメッセージを処理した場合、最後のサービスアクティベータにpingを実行した場合、ゲートウェイに別のコールを追加しました。しかし、私は多くの未知の出力や返信チャンネルを持っています –

+0

申し訳ありません - あなたの状況を理解するのは難しいです。いくつかの設定例とその問題点を正確にご記入ください。 –

+0

コード例を追加 –

関連する問題