これは、Java 7でDSLを使用してSpring Integrationを初めて設定したときのことです.Lambda式はJava 8でのみ動作します。したがって、例Spring Integration Java DSLとSpring Integration Java DSL (pre Java 8): Line by line tutorialを参照してください。リモートRESTfulサービスに送信するために同じリソースのすべての100個のメッセージを収集するために私の設定を以下に示します。JDK 7のJava DSLを介してSpring Integration Aggregatorを設定する
@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
@Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {
return IntegrationFlows.from("rawStringParsingRequestChannel")
.transform(new RawStringToCheckDataMessageTransformer())
.transform(new DataMessageToDtoTransformer())
.aggregate(new Consumer<AggregatorSpec>(){
@Override public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null)
.correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
.releaseStrategy(new MessageCountReleaseStrategy(100))
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("60000") ;
}
})
.transform(headerEnricher)
.transform(new ObjectToJsonTransformer())
.handle(httpOutboundAdapter)
.get();
}
ただし、私にとっては設定がうまくいかず、次のように例外がスローされます。
Exception in thread "main" java.lang.IllegalStateException: Failed to process message list
at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:79)
at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.aggregatePayloads(MethodInvokingMessageGroupProcessor.java:86)
at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:84)
at org.springframework.integration.dsl.AggregatorSpec$MessageGroupProcessorWrapper.processMessageGroup(AggregatorSpec.java:127)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:665)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:392)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy45.sendRawData(Unknown Source)
at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever$1.extractData(HistoricDataRetriever.java:82)
at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever$1.extractData(HistoricDataRetriever.java:68)
at org.springframework.jdbc.core.JdbcTemplate$1.doInPreparedStatement(JdbcTemplate.java:697)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:633)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:684)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:716)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:726)
at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.retrieveHistoricData(HistoricDataRetriever.java:92)
at prototype.healthcloud.historic.data.pusher.Application.main(Application.java:119)
Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter method
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:640)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:211)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:81)
at org.springframework.expression.spel.ast.MethodReference.getArguments(MethodReference.java:154)
at org.springframework.expression.spel.ast.MethodReference.getValueRef(MethodReference.java:71)
at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:66)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:87)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:319)
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:160)
at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:73)
... 61 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:636)
... 74 more
Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection.
at org.springframework.util.Assert.state(Assert.java:70)
at org.springframework.integration.util.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(MessagingMethodInvokerHelper.java:920)
... 79 more
根本的な原因は、osiuMessagingMethodInvokerHelper $ HandlerMethod方法generateExpressionであり、annotationTypeはNULLであるとのParameterType osisMessageGroupはCollection
、Collection<Message<?>>
または配列のサブインタフェースでもないので、式は#target」に設定されます。processMessageGroup(ペイロード) '。私は追加のロジックブロックをMessageGroup型を正しく処理するために追加する必要があると思います。
私の集計ロジックは非常に単純なので、outputExpressionを次のように指定することで解決策を見つけることができました。
@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
@Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {
return IntegrationFlows.from("rawStringParsingRequestChannel")
.transform(new RawStringToCheckDataMessageTransformer())
.transform(new DataMessageToDtoTransformer())
.aggregate(new Consumer<AggregatorSpec>(){
@Override public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.outputExpression("#this.![payload]")
.correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
.releaseStrategy(new MessageCountReleaseStrategy(100))
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("60000") ;
}
})
.transform(headerEnricher)
.transform(new ObjectToJsonTransformer())
.handle(httpOutboundAdapter)
.get();
}
これまでのところ、回避策ソリューションは、私のために動作しますが、私の質問は、アグリゲーションロジックが複雑な場合、プロセッサを構成する方法です。
スプリング統合のバージョンは4.3.10です。リリース –