2017-07-17 4 views
0

これは、Java 7でDSLを使用してSpring Integrationを初めて設定したときのことです.Lambda式はJava 8でのみ動作します。したがって、例Spring Integration Java DSLSpring Integration Java DSL (pre Java 8): Line by line tutorialを参照してください。リモートRESTfulサービスに送信するために同じリソースのすべての100個のメッセージを収集するために私の設定を以下に示します。JDK 7のJava DSLを介してSpring Integration Aggregatorを設定する

@Bean 
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter, 
              @Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) { 

    return IntegrationFlows.from("rawStringParsingRequestChannel") 
          .transform(new RawStringToCheckDataMessageTransformer()) 
          .transform(new DataMessageToDtoTransformer()) 
          .aggregate(new Consumer<AggregatorSpec>(){ 

           @Override public void accept(AggregatorSpec aggregatorSpec) { 
            aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null) 
                .correlationStrategy(new HeaderAttributeCorrelationStrategy("resource")) 
                .releaseStrategy(new MessageCountReleaseStrategy(100)) 
                .sendPartialResultOnExpiry(true) 
                .groupTimeoutExpression("60000") ; 
           } 
          }) 
          .transform(headerEnricher) 
          .transform(new ObjectToJsonTransformer()) 
          .handle(httpOutboundAdapter) 
          .get(); 
} 

ただし、私にとっては設定がうまくいかず、次のように例外がスローされます。

Exception in thread "main" java.lang.IllegalStateException: Failed to process message list 
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:79) 
    at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.aggregatePayloads(MethodInvokingMessageGroupProcessor.java:86) 
    at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:84) 
    at org.springframework.integration.dsl.AggregatorSpec$MessageGroupProcessorWrapper.processMessageGroup(AggregatorSpec.java:127) 
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:665) 
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135) 
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:392) 
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477) 
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429) 
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) 
    at com.sun.proxy.$Proxy45.sendRawData(Unknown Source) 
    at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever$1.extractData(HistoricDataRetriever.java:82) 
    at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever$1.extractData(HistoricDataRetriever.java:68) 
    at org.springframework.jdbc.core.JdbcTemplate$1.doInPreparedStatement(JdbcTemplate.java:697) 
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:633) 
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:684) 
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:716) 
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:726) 
    at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.retrieveHistoricData(HistoricDataRetriever.java:92) 
    at prototype.healthcloud.historic.data.pusher.Application.main(Application.java:119) 
Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter method 
    at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:640) 
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:211) 
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94) 
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:81) 
    at org.springframework.expression.spel.ast.MethodReference.getArguments(MethodReference.java:154) 
    at org.springframework.expression.spel.ast.MethodReference.getValueRef(MethodReference.java:71) 
    at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:66) 
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:87) 
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131) 
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330) 
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169) 
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:319) 
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:160) 
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:73) 
    ... 61 more 
Caused by: java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:636) 
    ... 74 more 
Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection. 
    at org.springframework.util.Assert.state(Assert.java:70) 
    at org.springframework.integration.util.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(MessagingMethodInvokerHelper.java:920) 
    ... 79 more 

根本的な原因は、osiuMessagingMethodInvokerHelper $ HandlerMethod方法generateExpressionであり、annotationTypeはNULLであるとのParameterType osisMessageGroupはCollectionCollection<Message<?>>または配列のサブインタフェースでもないので、式は#target」に設定されます。processMessageGroup(ペイロード) '。私は追加のロジックブロックをMessageGroup型を正しく処理するために追加する必要があると思います。

私の集計ロジックは非常に単純なので、outputExpressionを次のように指定することで解決策を見つけることができました。

@Bean 
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter, 
              @Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) { 

    return IntegrationFlows.from("rawStringParsingRequestChannel") 
          .transform(new RawStringToCheckDataMessageTransformer()) 
          .transform(new DataMessageToDtoTransformer()) 
          .aggregate(new Consumer<AggregatorSpec>(){ 

           @Override public void accept(AggregatorSpec aggregatorSpec) { 
            aggregatorSpec.outputExpression("#this.![payload]") 
                .correlationStrategy(new HeaderAttributeCorrelationStrategy("resource")) 
                .releaseStrategy(new MessageCountReleaseStrategy(100)) 
                .sendPartialResultOnExpiry(true) 
                .groupTimeoutExpression("60000") ; 
           } 
          }) 
          .transform(headerEnricher) 
          .transform(new ObjectToJsonTransformer()) 
          .handle(httpOutboundAdapter) 
          .get(); 
} 

これまでのところ、回避策ソリューションは、私のために動作しますが、私の質問は、アグリゲーションロジックが複雑な場合、プロセッサを構成する方法です。

+0

スプリング統合のバージョンは4.3.10です。リリース –

答えて

0

aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null)

あなたはその方法では、具体的なMessageGroupProcessorを使用することはできません。 POJO Beanとメソッド名が必要です(Beanに適格なメソッドが1つしかない場合はnullになります)。そのプロセッサからの出力は、あなたが望むものはおそらくないメッセージグループ、になること

使用

aggregatorSpec.outputProcessor(new SimpleMessageGroupProcessor())

注意。

DefaultAggregatingMessageGroupProcessorを使用することをお勧めします(outputProcessorを指定しない場合のデフォルトです)。

+0

メソッド名のないSimpleMessageGroupProcessorもDefaultAggregatingMessageGroupProcessorもどちらも動作せず、同じ例外が発生します。 –

+0

POJOアグリゲータは正しく動作しますが、 'public class PayloadExtractionAggregator {@Aggregator public List extract(List dtos){return dots;}}'のようにちょっと変わっていますが、DSLは 'aggregatorSpec .processor(payloadExtractingAggregator()、 "extract") ' –

+0

コメントにコードを入れないでください。非常に読みやすいわけではありません。代わりに質問(または回答)を編集してください。おそらくあなたは私の答えを誤って読んだでしょうか?具体的な 'MessageGroupProcessor'を使うとき、メソッドは' .processor(...) 'の代わりに' .outputProcessor(...) 'です。 –

関連する問題