私が取り組んでいるプロジェクトでは、SQSからのメッセージを読む必要があり、Akkaを使ってこれらのメッセージの処理を分散することに決めました。Akka、SQS、Camelによるコンシューマーポーリングレート
SQSはCamelのサポートであり、ConsumerクラスのAkkaで使用するための機能が組み込まれているため、エンドポイントを実装してこのようなメッセージを読むのが最善であると想像しました。そうしている人たち。
私の問題は、待ち行列を空にするか、空に近い状態にするのに十分な速さでポーリングできないということです。私が元々考えていたのは、コンシューマーがSQSからキャメルを介してX/sのレートでメッセージを受け取ることができるということでした。そこから、より多くの消費者を作って、処理されたメッセージを必要とする割合に達することができました。
マイ消費者:示されているように
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
、私はメッセージのレートを向上させるためにdelay=1
など&maxMessagesPerPoll=10
を設定しましたが、私は同じエンドポイントで複数のコンシューマを起動することができませんでしだ。
私はBy default endpoints are assumed not to support multiple consumers.
と私は、複数の消費者を産卵することは私の分のためのシステムを実行した後、出力メッセージがCount for actor: x
の代わりに、ある一つだけを消費者に与えるよう、これは、同様SQSエンドポイントにも当てはまると考えていることをドキュメントに読み込みます他のものはCount for actor: 0
を出力します。
これはすべて役に立ちます。私は、現在の単一のコンシューマでこの実装で約33メッセージ/秒を読むことができます。
これは、AkkaのSQSキューからメッセージを読み取る適切な方法ですか?もしそうなら、私はこれを外部に拡張して、私のメッセージ消費率を900メッセージ/秒に近づけることができますか?