2017-05-26 16 views
2

のスプリッタ、サービスアクティベータ同時ユニット私は春の統合 - JPAインバウンドチャネルアダプタ、作業

  1. ロードN倍の資産がチャネルに、JPAで、ステータスXを持つ非常に単純Spring統合パイプラインを設定している
  2. スプリットついにY

に、このPIPの同期的な性質をエンティティのステータスを更新し、サービスアクティベータを使用して新しいチャネルに各JPAエンティティ

  • プロセスJPAエンティティ、 elineとは、JPAインバウンド・チャネル・アダプタは、以前のインバウンド・チャネル・アダプタ内のすべてのメッセージを一度起動し、次に分割チャネルが処理された後にのみ送信されます。nullChannel

    これはうまくいきますが、

    サービスアクティベータはいくつかのことを行いますが、そのうちの1つは外部REST APIを呼び出してからアセットのステータスを更新するため、#1から除外されます。

    ここで問題となるのは、サービスアクティベータが1つのメッセージを処理するのに約1秒かかることです(この時間のほとんどはREST APIの呼び出しです)。そのため、250個のJPAエンティティのキ​​ューには250秒プロセス。

    REST APIを並行して、たとえば5回呼び出しても1秒かかるとします。我々は多分パイプライン全体が同期「作業単位」として実行できるようになりますAggregatorTask Executorを、追加して、当社のパイプラインに作るが、許可することができ、簡単な変更がある場合

    だから、私は疑問に思ってService Activatorが同時に処理されます。

    これはまあ、aggregatorはとにかく本当にすべての返信を待っているために行くための正しい方法ですが、一緒にExecutorChannelsplitter後のあなたの無料ポーラーの手との統合の設定

    <channel id="newAssetChannel" /> 
    <channel id="splitAssetChannel" /> 
    
    <int-jpa:inbound-channel-adapter 
         id="newAssetChannelAdapter" 
         channel="newAssetChannel" 
         entity-manager-factory="entityManagerFactory" 
         entity-class="com.foo.domain.Asset" 
         jpa-query="select a from Asset a where (a.status = 'NEW' or a.status = 'UPDATED') and a.health = 'OK' ORDER BY a.priority DESC, a.updatedDate ASC" 
         max-results="250"> 
        <poller fixed-rate="5000" max-messages-per-poll="1" /> 
    </int-jpa:inbound-channel-adapter> 
    
    <splitter expression="payload" 
        input-channel="newAssetChannel" 
        output-channel="splitNewAssetChannel" /> 
    
    <service-activator 
        id="newAssetServiceActivator" 
        input-channel="splitNewAssetChannel" 
        output-channel="nullChannel" 
        ref="assetProcessor" 
        method="processNew" /> 
    
  • 答えて

    1

    です。したがって、並列分割結合の前後でいくつかの障壁が必要です。

    あなたは<gateway>でそれを行うことができます。

    <gateway id="gateway" default-request-channel="splitterChannel"/> 
    
    <service-activator id="gatewayTestService" input-channel="newAssetChannel" output-channel="saveRowsChannel" ref="gateway"/> 
    

    スプリッタのoutput-channelExecutorChannelでなければなりません。 newAssetServiceActivatoraggregatorに出力する必要があります。アグリゲータには、gatewayへの返信を意味するoutput-channelがありません。

    +0

    これは私を適切な道に導き、間違いなくSpring Integrationの一般的な理解を深めてくれます。完全にアップデートされた設定はここにあります(私が改善できる何かをしたら、要点は分かります) - gist.github.com/shainegordon/66da307b2e89f69e549b675ffbfd363 e – kabal