2013-11-04 8 views
6

私が取り組んでいるプロジェクトでは、SQSからのメッセージを読む必要があり、Akkaを使ってこれらのメッセージの処理を分散することに決めました。Akka、SQS、Camelによるコンシューマーポーリングレート

SQSはCamelのサポートであり、ConsumerクラスのAkkaで使用するための機能が組み込まれているため、エンドポイントを実装してこのようなメッセージを読むのが最善であると想像しました。そうしている人たち。

私の問題は、待ち行列を空にするか、空に近い状態にするのに十分な速さでポーリングできないということです。私が元々考えていたのは、コンシューマーがSQSからキャメルを介してX/sのレートでメッセージを受け取ることができるということでした。そこから、より多くの消費者を作って、処理されたメッセージを必要とする割合に達することができました。

マイ消費者:示されているように

import akka.camel.{CamelMessage, Consumer} 
import akka.actor.{ActorRef, ActorPath} 

class MyConsumer() extends Consumer { 
    def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" 
    var count = 0 

    def receive = { 
    case msg: CamelMessage => { 
     count += 1 
    } 
    case _ => { 
     println("Got something else") 
    } 
    } 

    override def postStop(){ 
    println("Count for actor: " + count) 
    } 
} 

、私はメッセージのレートを向上させるためにdelay=1など&maxMessagesPerPoll=10を設定しましたが、私は同じエンドポイントで複数のコンシューマを起動することができませんでしだ。

私はBy default endpoints are assumed not to support multiple consumers.と私は、複数の消費者を産卵することは私の分のためのシステムを実行した後、出力メッセージがCount for actor: xの代わりに、ある一つだけを消費者に与えるよう、これは、同様SQSエンドポイントにも当てはまると考えていることをドキュメントに読み込みます他のものはCount for actor: 0を出力します。

これはすべて役に立ちます。私は、現在の単一のコンシューマでこの実装で約33メッセージ/秒を読むことができます。

これは、AkkaのSQSキューからメッセージを読み取る適切な方法ですか?もしそうなら、私はこれを外部に拡張して、私のメッセージ消費率を900メッセージ/秒に近づけることができますか?

答えて

5

Sadly Camelは現在、SQSでのメッセージの並列消費をサポートしていません。私はAWS-javaの-SDKを使用してバッチメッセージSQSをポーリングするために自分自身の俳優を書いているこの問題に対処するため

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

def receive = { 
    case BeginPolling => { 
     // re-queue sending asynchronously 
     self ! BeginPolling 
     // traverse the response 
     val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry] 
     val messages = sqs.receiveMessage(receiveMessageRequest).getMessages 
     messages.toList.foreach { 
     node => { 
      deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle)) 
      //log.info("Node body: {}", node.getBody) 
      filterSupervisor ! node.getBody 
     } 
     } 
     if(deleteEntryList.size() > 0){ 
     val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList) 
     sqs.deleteMessageBatch(deleteMessageBatchRequest) 
     } 
    } 

    case _ => { 
     log.warning("Unknown message") 
    } 
    } 

、これが最良の実装で、要求は常に空のキューをヒットされないように、それはもちろん、大幅に改善することができれば、私は確かではないんだけど、それがポーリングすることができるという私の現在のニーズに合わせて行います同じキューからのメッセージ

これでSQSから約133(メッセージ/秒)/ actorを取得します。

1

Camel 2.15はconcurrentConsumersをサポートしていますが、akka camel 2.15をサポートしているかどうかわかりません。消費者が複数いる場合でも1つのコンシューマーアクタを置くと違いがあります。

関連する問題