2017-01-18 14 views
1

私はSpring AMQPを使用してメッセージを聞きます(設定にはlistener-container, service-activator, chain, bridge & aggregatorsがあります)。アプリケーションの起動時に、AMQPは望ましくないメッセージの読み取りを開始します。私はauto-startup=falseを試しましたが、動作しません。私は何か不足していますか?AMQP自動メッセージの読み込みを防止する

また、動作する場合は、プログラムで再起動するにはどうすればよいですか?私はlistenerContainer.start();を試しました。アグリゲーターについては&はどうですか?続き

EDIT

は私の設定です:

<rabbit:queue name="my_queue1" declared-by="consumerAdmin"/> 
<rabbit:queue name="my_queue2" declared-by="consumerAdmin"/> 
<rabbit:queue name="my_batch1" declared-by="consumerAdmin"/> 

<int-amqp:channel id="myPollableChannel" message-driven="false" connection-factory="consumerConnFactory" queue-name="my_queue2"/> 
<int-event:inbound-channel-adapter channel="myPollableChannel" auto-startup="false"/> 

<int-amqp:channel id="myAggregateChannel" connection-factory="consumerConnFactory"/> 
<int-event:inbound-channel-adapter channel="myAggregateChannel" auto-startup="false"/> 

<int-amqp:channel id="myChannel" connection-factory="consumerConnFactory"/> 
<int-event:inbound-channel-adapter channel="myChannel" auto-startup="false"/> 

<int-amqp:channel id="myFailedChannel" connection-factory="consumerConnFactory"/> 
<int-event:inbound-channel-adapter channel="myFailedChannel" auto-startup="false"/> 

<rabbit:template id="genericTopicTemplateWithRetry" connection-factory="connectionFactory" exchange="my_exchange" retry-template="retryTemplate"/> 

<rabbit:topic-exchange name="my_exchange" declared-by="consumerAdmin"> 
     <rabbit:bindings> 
      <rabbit:binding queue="my_queue1" pattern="pattern1"/> 
      <rabbit:binding queue="my_queue2" pattern="pattern1"/> 
     </rabbit:bindings> 
</rabbit:topic-exchange> 

<int:handler-retry-advice id="retryAdvice" max-attempts="5" recovery-channel="myFailedChannel"> 
    <int:exponential-back-off initial="3000" multiplier="5.0" maximum="300000"/> 
</int:handler-retry-advice> 

<int:bridge input-channel="myPollableChannel" output-channel="myAggregateChannel"> 
    <int:poller max-messages-per-poll="100" fixed-rate="5000"/> 
</int:bridge> 

<int:aggregator id="myBatchAggregator" 
    ref="myAggregator" 
    correlation-strategy="myCorrelationStrategy" 
    release-strategy="myReleaseStrategy" 
    input-channel="myAggregateChannel" 
    output-channel="myChannel" 
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true" 
    group-timeout="1000" /> 

<int:chain input-channel="myFailedChannel"> 
    <int:transformer expression="'Failed to publish messages to my channel:' + payload.failedMessage.payload" /> 
    <int-stream:stderr-channel-adapter append-newline="true"/> 
</int:chain> 

<int:service-activator input-channel="myChannel" output-channel="nullChannel" ref="myWorker" method="myMethod"> 
    <int:request-handler-advice-chain><ref bean="retryAdvice" /></int:request-handler-advice-chain> 
</int:service-activator> 

<rabbit:listener-container connection-factory="consumerConnFactory" requeue-rejected="false" concurrency="1"> 
    <rabbit:listener ref="myListener" method="listen" queue-names="queues1" admin="consumerAdmin" /> 
</rabbit:listener-container> 
+0

あなたの設定を共有する必要があります。 AMQPからのメッセージを読まないで、インバウンド・チャネル・アダプタに 'auto-startup = false'があります。それで十分です。あなたはアグリゲータやその他について心配する必要はありません –

+0

Thanks Artem。上記の私の設定を追加しました。私は次の関数を使って再びONにしています: 'private void startMessageListeners(){ final Map containers = this.applicationContext.getBeansOfType(AbstractMessageListenerContainer.class); if(containers!= null &&!containers.isEmpty()){ for(final AbstractMessageListenerContainerコンテナ:containers.values()){ if(!container.isRunning()){ container.start(); } } } } お勧めします。 – Ameya

答えて

0

OK。設定していただきありがとうございます!

AMQPベースのチャネルが必要な理由はわかりませんが、主な問題はまさにそこにあります。

ただし、<int-amqp:channel>にはauto-startup="false"オプションがあります。

AMQPからデータを受信する準備ができたら、idでこれらのチャネルをstart()だけ必要とします。

+0

以下のような設定を更新しても、 ' ' 私はまだ取得しています: ' [org.springframework.integration.amqp.channel.PollableAmqpChannel](task-scheduler-1)PollableAmqpChannelでタイムアウト値でreceiveを呼び出します。受信タイムアウトがサポートされていないため、タイムアウトは無視されます。 私は何かを見逃しましたか? – Ameya

+0

もちろん。 'myPollableChannel'については' 'で' bridge'を止めなければならないからです。申し訳ありませんが、私はそれを逃した。 'message-driven =" false "は' listener'を意味しません。したがって、そのライフサイクルは 'poller'エンドポイントによって制御されます。 –

+0

私は 'PollableAmqpChannel'の'自動起動 'を開始/停止&防止するリスナーベースのチャンネルを取得しました。しかし、私はそれをプログラムで開始することはできませんでした。コンフィグ/コード: ' 「 」最終オブジェクトのチャネル= this.applicationContext.getBean( "myPollableChannel"); if(チャネルインスタンスSmartLifecycle){ リターン((SmartLifecycle)チャネル)やめる(); } else if(channel instanceof PollableAmqpChannel){ // FIXME:ライフサイクルにキャストできません。どうやって止めるの? } ' – Ameya

関連する問題