2017-05-31 28 views
1

Apache Camel 2.19.0では、sedaキューのエグゼキュータがいっぱいになっているときに同時にメッセージを生成し、その結果を非同期にコンシューマsedaキューで消費したいと思っています。 その背後にあるユースケース:多くの行を含む大きなファイルを処理する必要があり、個々の行ごとに1つのメッセージがオーバーヘッドになりすぎるため、ファイル全体をヒープに収めることができないため、バッチを作成する必要があります。しかし、結局、私がトリガーしたすべてのバッチが正常に完了したかどうかを知る必要があります。 キューを迷惑メールにする背圧メカニズムが必要ですが、同時にマルチスレッド処理を活用したいと考えています。Apache Camel:非同期操作とバックプレッシャー

ここでは、CamelとSpringの簡単な例を示します。私が設定したルート:

package com.test; 

import org.apache.camel.builder.RouteBuilder; 
import org.springframework.stereotype.Component; 

@Component 
public class AsyncCamelRoute extends RouteBuilder { 

    public static final String ENDPOINT = "seda:async-queue?concurrentConsumers=2&size=2&blockWhenFull=true"; 

    @Override 
    public void configure() throws Exception { 
     from(ENDPOINT) 
       .process(exchange -> { 
        System.out.println("Processing message " + (String)exchange.getIn().getBody()); 
        Thread.sleep(10_000); 
       }); 
    } 
} 

プロデューサーは次のようになります。

package com.test; 

import org.apache.camel.ProducerTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.context.event.ContextRefreshedEvent; 
import org.springframework.context.event.EventListener; 
import org.springframework.stereotype.Component; 

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.CompletableFuture; 

@Component 
public class AsyncProducer { 

    public static final int MAX_MESSAGES = 100; 

    @Autowired 
    private ProducerTemplate producerTemplate; 

    @EventListener 
    public void handleContextRefresh(ContextRefreshedEvent event) throws Exception { 
     new Thread(() -> { 
      // Just wait a bit so everything is initialized 
      try { 
       Thread.sleep(5_000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      List<CompletableFuture> futures = new ArrayList<>(); 

      System.out.println("Producing messages"); 
      for (int i = 0; i < MAX_MESSAGES; i++) { 
       CompletableFuture future = producerTemplate.asyncRequestBody(AsyncCamelRoute.ENDPOINT, String.valueOf(i)); 
       futures.add(future); 
      } 
      System.out.println("All messages produced"); 

      System.out.println("Waiting for subtasks to finish"); 
      futures.forEach(CompletableFuture::join); 
      System.out.println("Subtasks finished"); 
     }).start(); 

    } 
} 

このコードの出力は次のようになります。

Producing messages 
All messages produced 
Waiting for subtasks to finish 
Processing message 6 
Processing message 1 
Processing message 2 
Processing message 5 
Processing message 8 
Processing message 7 
Processing message 9 
... 
Subtasks finished 

だから、blockIfFullは無視され、すべてのメッセージされているようです処理の前に作成され、キューに置かれます。

メッセージを作成する方法はありますか?キャメルで非同期処理を使用すると同時に、処理されていない要素が多すぎる場合、キューに要素を配置することをブロックしますか?

+1

'asyncRequestBody(..)'の代わりに 'requestBody(..)'を試すことができますか?非同期メッセージの送信に使用されるプール内にブロックされたスレッドがたくさんある可能性があります。クライアントスレッドをブロックする代わりに。 – Ralf

+0

こんにちは@ラルフ、私はあなたのアプローチをよく理解していません - 消費者が終了するまで、requestBodyはクライアント(プロデューサ)をブロックします。消費者をスパムしている場合はクライアントをブロックしたいが、消費者がいる限りメッセージを作成する必要がある。しかし私はそれを別のアプローチで解決しました。 –

+1

それは正しいです。しかし、あなたが何か非同期を行うと、別のスレッドがsedaに提出して応答を待つ作業をしています。ループを実行し、 'asyncRequestBody(..)'を呼び出すスレッドは、非同期タスクを処理するスレッドプールが使い果たされない限りブロックされません。しかし、スレッドがプール内で必要に応じて作成されると、ループスレッドがブロックされることはありません。 – Ralf

答えて

0

ストリーミングとカスタムスプリッタを使用して問題を解決しました。これを行うことで、ソース行を1行だけではなく行のリストを返すイテレータを使って分割することができます。これで、私は必要に応じてCamelを使用できるように思えます。

.split().method(new SplitterBean(), "splitBody").streaming().parallelProcessing().executorService(customExecutorService) 

上記のような振る舞いを持つカスタムメイドスプリッタ付:

だからルートは、以下の部分が含まれています。

関連する問題