2016-07-28 24 views
1

与えられた時間に作業中のsqsキューから1つのアイテムを許可しようとしています。現在のところ、キューの1つのメッセージだけを持ち上げますが、ポーリングするたびにそのように見えます。Awsインテグレーションスプリング:sqsから1アイテムのみを保証する

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
     executor.setCorePoolSize(2); 
     executor.setMaxPoolSize(2); 
     executor.setQueueCapacity(10); 
     executor.setThreadNamePrefix("test-"); 
     executor.initialize(); 
     return executor; 

     new SqsMessageDrivenChannelAdapter(amazon)); 
     adapter.setMaxNumberOfMessages(1); 
     adapter.setSendTimeout(2000); 
     adapter.setVisibilityTimeout(1200); 
     adapter.setWaitTimeOut(20); 
     adapter.setTaskExecutor(this.asyncTaskExecutor()); 

この問題は、ThreadPoolTask​​Executor内で発生していると思われます。キューのサイズが10になると、これが満杯になるまで毎回持ち上がりますか? 1にmaxPoolSizeを設定

は、問題があなたのThreadPoolExecutorがこのキューからメッセージを消費するために2つのスレッドを使用して10のBlockingQueueの大きさに設定されていることである

Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.[email protected]406354e5 rejected from [email protected][Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0] 
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) 
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) 
    ... 6 common frames omitted 

答えて

1

を引き起こします。したがって、いつでもメッセージ上で同時に2つのスレッドを処理できます。 PoolSizeを1に設定すると、指定した時間に1つのメッセージしか処理されないことが保証されます。ソースコードから

/* 
* Proceed in 3 steps: 
* 
* 1. If fewer than corePoolSize threads are running, try to 
* start a new thread with the given command as its first 
* task. The call to addWorker atomically checks runState and 
* workerCount, and so prevents false alarms that would add 
* threads when it shouldn't, by returning false. 
* 
* 2. If a task can be successfully queued, then we still need 
* to double-check whether we should have added a thread 
* (because existing ones died since last checking) or that 
* the pool shut down since entry into this method. So we 
* recheck state and if necessary roll back the enqueuing if 
* stopped, or start a new thread if there are none. 
* 
* 3. If we cannot queue task, then we try to add a new 
* thread. If it fails, we know we are shut down or saturated 
* and so reject the task. 

あなたは第三のケースをヒットしています。

+0

これは私の最初の考えではこれがうまくいきませんでした。 @Gandalf – user101010101

+0

これを理解するにはもっと多くのコードが必要になるでしょう。 GitHub? – Gandalf

関連する問題