Apache Camel 2.19.0では、sedaキューのエグゼキュータがいっぱいになっているときに同時にメッセージを生成し、その結果を非同期にコンシューマsedaキューで消費したいと思っています。 その背後にあるユースケース:多くの行を含む大きなファイルを処理する必要があり、個々の行ごとに1つのメッセージがオーバーヘッドになりすぎるため、ファイル全体をヒープに収めることができないため、バッチを作成する必要があります。しかし、結局、私がトリガーしたすべてのバッチが正常に完了したかどうかを知る必要があります。 キューを迷惑メールにする背圧メカニズムが必要ですが、同時にマルチスレッド処理を活用したいと考えています。Apache Camel:非同期操作とバックプレッシャー
ここでは、CamelとSpringの簡単な例を示します。私が設定したルート:
package com.test;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class AsyncCamelRoute extends RouteBuilder {
public static final String ENDPOINT = "seda:async-queue?concurrentConsumers=2&size=2&blockWhenFull=true";
@Override
public void configure() throws Exception {
from(ENDPOINT)
.process(exchange -> {
System.out.println("Processing message " + (String)exchange.getIn().getBody());
Thread.sleep(10_000);
});
}
}
プロデューサーは次のようになります。
package com.test;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Component
public class AsyncProducer {
public static final int MAX_MESSAGES = 100;
@Autowired
private ProducerTemplate producerTemplate;
@EventListener
public void handleContextRefresh(ContextRefreshedEvent event) throws Exception {
new Thread(() -> {
// Just wait a bit so everything is initialized
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<CompletableFuture> futures = new ArrayList<>();
System.out.println("Producing messages");
for (int i = 0; i < MAX_MESSAGES; i++) {
CompletableFuture future = producerTemplate.asyncRequestBody(AsyncCamelRoute.ENDPOINT, String.valueOf(i));
futures.add(future);
}
System.out.println("All messages produced");
System.out.println("Waiting for subtasks to finish");
futures.forEach(CompletableFuture::join);
System.out.println("Subtasks finished");
}).start();
}
}
このコードの出力は次のようになります。
Producing messages
All messages produced
Waiting for subtasks to finish
Processing message 6
Processing message 1
Processing message 2
Processing message 5
Processing message 8
Processing message 7
Processing message 9
...
Subtasks finished
だから、blockIfFullは無視され、すべてのメッセージされているようです処理の前に作成され、キューに置かれます。
メッセージを作成する方法はありますか?キャメルで非同期処理を使用すると同時に、処理されていない要素が多すぎる場合、キューに要素を配置することをブロックしますか?
'asyncRequestBody(..)'の代わりに 'requestBody(..)'を試すことができますか?非同期メッセージの送信に使用されるプール内にブロックされたスレッドがたくさんある可能性があります。クライアントスレッドをブロックする代わりに。 – Ralf
こんにちは@ラルフ、私はあなたのアプローチをよく理解していません - 消費者が終了するまで、requestBodyはクライアント(プロデューサ)をブロックします。消費者をスパムしている場合はクライアントをブロックしたいが、消費者がいる限りメッセージを作成する必要がある。しかし私はそれを別のアプローチで解決しました。 –
それは正しいです。しかし、あなたが何か非同期を行うと、別のスレッドがsedaに提出して応答を待つ作業をしています。ループを実行し、 'asyncRequestBody(..)'を呼び出すスレッドは、非同期タスクを処理するスレッドプールが使い果たされない限りブロックされません。しかし、スレッドがプール内で必要に応じて作成されると、ループスレッドがブロックされることはありません。 – Ralf