1

amazon sqsキューからのメッセージとそれまでの動作状態を消費するように統合フローを設定しようとしています。しかし、私は分や秒ごとにメッセージの数をペースしたいと思います。例えば毎分20メッセージ。ここで バネ統合を使用してsqsキューの消費をペースする方法

は私のSQLリスナーBean

@Bean 
    public MessageProducer mySqsMessageDrivenChannelAdapter() { 
     SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName); 
     adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS); 

     adapter.setVisibilityTimeout(TIMEOUT_VISIBILITY); 
     adapter.setWaitTimeOut(TIMEOUT_MESSAGE_WAIT); 
     adapter.setMaxNumberOfMessages(prefetch); 
     adapter.setOutputChannel(processMessageChannel()); 
     return adapter; 
    } 

あなたは、私が投票ごとにフェッチするメッセージの最大数を設定していますが、どのようにポーリング間の遅延時間を設定するために見ることができるように定義ですか?

通常のjmsキューでは、カスタムポーラーを使用してJMS.inboundAdapterを使用できますが、SqsMessageDrivenChannelAdapterを使用すると、どのポーリングタイマー値も設定できません。

多分私はSqsMessageDrivenChannelAdapter以外のMessageProducerを使うことができましたが、どちらを使うのでしょうか?

sqsを使用してJMS.inboundAdapterを設定することはできますか?

+0

解決策はこちらをご覧ください。https://stackoverflow.com/questions/29667321/polling-interval-for-jms-messagelistener-with-sqs-provider?rq=1この場合、この質問は重複していると見なすことができますが、ここでのポイントは私がスプリング統合を使用していることです。私はそこに存在する解決策を試すつもりですが、私はこの問題を解決します。 –

答えて

1

スプリング統合SqsMessageDrivenChannelAdapterは、メッセージドライバのアクティブコンポーネントです。これは、長時間実行しているSpringh Cloud AWSプロジェクトのSimpleMessageListenerContainerに基づいており、AmazonSQS.receiveMessage()と呼ぶループがwhile()です。そのループ内のロジックはあまりにも複雑ではありません。

try { 
    ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest()); 
    CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size()); 
    for (Message message : receiveMessageResult.getMessages()) { 
     if (isQueueRunning()) { 
      MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes); 
       getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor)); 
     } else { 
      messageBatchLatch.countDown(); 
     } 
    } 
    try { 
     messageBatchLatch.await(); 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
    } 
} catch (Exception e) { 

あなたは私たちがmessageBatchLatchそこに作成し、ループの後、それを待って見るように。 それぞれのメッセージは、ので処理されます。このメッセージの最後には、MessageExecutorの末尾にcountDown()が入ります。だから、あなたがしたいと思うのは、ターゲットサービス方法の人工的なThread.sleep()で達成されるかもしれませんが、SQS投票の間にもう少し間隔があります。

しかし、私はあなたの要求を聞いて、私たちは本当にのようなものを追加する必要があります。

/** 
* The sleep interval in milliseconds used in the main loop between shards polling cycles. 
* Defaults to {@code 1000} minimum {@code 250}. 
* @param idleBetweenPolls the interval to sleep between shards polling cycles. 
*/ 
public void setIdleBetweenPolls(int idleBetweenPolls) { 
    this.idleBetweenPolls = Math.max(250, idleBetweenPolls); 
} 

私はKinesisMessageDrivenChannelAdapterのためにこれをやったが、ここではSimpleMessageListenerContainerのためにそれを行うために春クラウドAWSを要求しなければなりません。

関連する問題