私はSpring AMQPを使用してメッセージを聞きます(設定にはlistener-container, service-activator, chain, bridge & aggregators
があります)。アプリケーションの起動時に、AMQPは望ましくないメッセージの読み取りを開始します。私はauto-startup=false
を試しましたが、動作しません。私は何か不足していますか?AMQP自動メッセージの読み込みを防止する
また、動作する場合は、プログラムで再起動するにはどうすればよいですか?私はlistenerContainer.start();
を試しました。アグリゲーターについては&はどうですか?続き
EDIT
は私の設定です:
<rabbit:queue name="my_queue1" declared-by="consumerAdmin"/>
<rabbit:queue name="my_queue2" declared-by="consumerAdmin"/>
<rabbit:queue name="my_batch1" declared-by="consumerAdmin"/>
<int-amqp:channel id="myPollableChannel" message-driven="false" connection-factory="consumerConnFactory" queue-name="my_queue2"/>
<int-event:inbound-channel-adapter channel="myPollableChannel" auto-startup="false"/>
<int-amqp:channel id="myAggregateChannel" connection-factory="consumerConnFactory"/>
<int-event:inbound-channel-adapter channel="myAggregateChannel" auto-startup="false"/>
<int-amqp:channel id="myChannel" connection-factory="consumerConnFactory"/>
<int-event:inbound-channel-adapter channel="myChannel" auto-startup="false"/>
<int-amqp:channel id="myFailedChannel" connection-factory="consumerConnFactory"/>
<int-event:inbound-channel-adapter channel="myFailedChannel" auto-startup="false"/>
<rabbit:template id="genericTopicTemplateWithRetry" connection-factory="connectionFactory" exchange="my_exchange" retry-template="retryTemplate"/>
<rabbit:topic-exchange name="my_exchange" declared-by="consumerAdmin">
<rabbit:bindings>
<rabbit:binding queue="my_queue1" pattern="pattern1"/>
<rabbit:binding queue="my_queue2" pattern="pattern1"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<int:handler-retry-advice id="retryAdvice" max-attempts="5" recovery-channel="myFailedChannel">
<int:exponential-back-off initial="3000" multiplier="5.0" maximum="300000"/>
</int:handler-retry-advice>
<int:bridge input-channel="myPollableChannel" output-channel="myAggregateChannel">
<int:poller max-messages-per-poll="100" fixed-rate="5000"/>
</int:bridge>
<int:aggregator id="myBatchAggregator"
ref="myAggregator"
correlation-strategy="myCorrelationStrategy"
release-strategy="myReleaseStrategy"
input-channel="myAggregateChannel"
output-channel="myChannel"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true"
group-timeout="1000" />
<int:chain input-channel="myFailedChannel">
<int:transformer expression="'Failed to publish messages to my channel:' + payload.failedMessage.payload" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<int:service-activator input-channel="myChannel" output-channel="nullChannel" ref="myWorker" method="myMethod">
<int:request-handler-advice-chain><ref bean="retryAdvice" /></int:request-handler-advice-chain>
</int:service-activator>
<rabbit:listener-container connection-factory="consumerConnFactory" requeue-rejected="false" concurrency="1">
<rabbit:listener ref="myListener" method="listen" queue-names="queues1" admin="consumerAdmin" />
</rabbit:listener-container>
あなたの設定を共有する必要があります。 AMQPからのメッセージを読まないで、インバウンド・チャネル・アダプタに 'auto-startup = false'があります。それで十分です。あなたはアグリゲータやその他について心配する必要はありません –
Thanks Artem。上記の私の設定を追加しました。私は次の関数を使って再びONにしています: 'private void startMessageListeners(){ final Map containers = this.applicationContext.getBeansOfType(AbstractMessageListenerContainer.class); if(containers!= null &&!containers.isEmpty()){ for(final AbstractMessageListenerContainerコンテナ:containers.values()){ if(!container.isRunning()){ container.start(); } } } } お勧めします。 –
Ameya