2016-07-18 7 views
2

SQLクエリの出力を並列処理しようとしています。以下は私のコードです。私はAggregatorでsysoutを持っています。しかし、私は無作為にAggregatorのsysoutが印刷されていないことを知っています。また、アグリゲーターの解放メソッドは、SYSOUTを印刷していません。私は、どこかでメッセージを失っていると思う。誰でも光を放つことができますか?Spring統合スプリッタの後に失われたメッセージ。データがアグリゲータに無作為に届かない

<int:bridge input-channel="inputChannel" output-channel="dbRequestChannel" /> 

     <jdbc:outbound-gateway request-channel="dbRequestChannel" 
      max-rows-per-poll="0" data-source="dataSource" reply-channel="headerEnricher" 
      query="select empname, empno, empdob from employee where empno = 1234" /> 

     <int:header-enricher input-channel="headerEnricher" 
      output-channel="splitterChannel"> 
      <int:header name="payloadSize" value="3"></int:header> 
     </int:header-enricher> 

     <int:chain input-channel="splitterChannel" output-channel="splitterOutputChannel"> 
      <int:splitter /> 
     </int:chain> 


     <int:channel id="splitterOutputChannel"> 
      <int:dispatcher task-executor="sampleTaskExecutor" /> 
     </int:channel> 

     <bean id="sampleTaskExecutor" 
      class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
      <property name="corePoolSize" value="5" /> 
      <property name="maxPoolSize" value="10" /> 
      <property name="queueCapacity" value="25" /> 
     </bean> 

     <int:service-activator input-channel="splitterOutputChannel" 
      ref="springIntegrationtest" method="testMethod" output-channel="aggregatePayload"> 
     </int:service-activator> 

     <int:aggregator input-channel="aggregatePayload" 
      release-strategy-method="release" output-channel="nullChannel" 
      send-partial-result-on-expiry="true" ref="springIntegrationtest" 
      method="aggregateData" /> 

    @RunWith(SpringJUnit4ClassRunner.class) 
@ContextConfiguration(locations = { "classpath:spring-integration.xml" }) 
public class SpringIntegrationTest { 


    @Autowired 
    private MessageChannel inputChannel; 

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 



    @Test 
    public void testQueue() { 
     Message<String> quoteMessage = MessageBuilder 
       .withPayload("testPayload").build(); 
     inputChannel.send(quoteMessage); 
    } 


    public Map<String, String> testMethod(Message<?> m) { 

     System.out.println(sdf.format(new Date())); 
     return (Map<String, String>) m.getPayload(); 
    } 

    public boolean release(ArrayList<Map<String, Object>> payload) { 
     boolean release = false; 
     int size = payload.size(); 
     if (size == 3) { 
      release = true; 
     } 
     System.out.println(release); 
     return release; 
    } 

    public Message<String> aggregateData(ArrayList<Map<String, Object>> payload) { 

     System.out.println("In aggregateData " + payload); 
     Message<String> quoteMessage = MessageBuilder 
       .withPayload("testPayload").build(); 
     return quoteMessage; 
    } 

} 

答えて

1

あなたの問題は、状態とオプションの組み合わせにあると思います。

あなたがこの持っている:

int size = payload.size(); 
if (size == 3) { 
    release = true; 
} 

をので、あなたのアグリゲータは3アイテムが到着した直後にグループを解放しようとしている。一方、あなたは、分割後より多くの項目を有することができます。

releaseアグリゲータはグループを終了して終了します。デフォルトではexpireGroupsUponCompletion = falseのようなオプションがあります。つまり、のグループを店舗に保持しますが、completedの状態です。

アグリゲータタプルを3だけ放出するという目標がある場合は、expireGroupsUponCompletiontrueに切り替えることを検討する必要があります。詳細については、アグリゲータmanualを参照してください。

+0

クエリの出力は常に3つの結果しか返しません。だから、テストのために、私はこの条件を追加しました。 –

+0

org.springframework.integrationのログをデバッグしてください。 –

関連する問題