2012-12-03 18 views
5

私は、http呼び出しの処理を担当するAkkaの俳優を務めています。私は、APIを介して複数のHTTPリクエストを送信するためにScalaの派遣を使用します。resultHandler機能で(akka)Actorで複数の約束を処理する方法は?

urls.foreach { u 
    val service = url(u) 
    val promise = Http(service OK as.String).either 
    for(p <- promise) 
    { 
    p match 
    { 
     case Left(error) => 
     faultHandler(error) 
     case Right(result) => 
     resultHandler(result) 
    } 
    } 

、私はインスタンス変数nbOfResultsをインクリメントし、私が行ったコールの数と比較します。

def resultHandler(result:String) 
{ 
    this.nbOfResults++ 
    ... 
    if(nbOfResults == nbOfCalls) 
    // Do something 
} 

安全ですか? 2つの呼び出しが同時に結果を返す場合は、nbOfResultsに同時にアクセスできますか?

今のところ、アクターは多かれ少なかれスレッドと同等であると考えられ、したがってコールバック関数は同時に実行されません。それが正しいか ?

+0

以下の回答含まれ、私ははっきりとはい、あなたのレコードの状態したいのですが非同期コールバックに注意する必要がありますが、それらは同時に実行されます。つまり、上のコードのnbOfResultsの処理が間違っています。 –

答えて

3

は発送のみを使用したアレクセイ・ロマノフ応答の変形されています。それを行う方法についての提案を

//Promises will be of type Array[Promise[Either[Throwable, String]]] 
val promises = urls.map { u => 
    val service = url(u) 

    Http(service OK as.String).either 
} 

//Http.promise.all transform an Iterable[Promise[A]] into Promise[Iterable[A]] 
//So listPromise is now of type Promise[Array[Either[Throwable, String]]] 
val listPromise = Http.promise.all(promises) 

for (results <- listPromise) { 
    //Here results is of type Array[Either[Throwable, String]] 

    results foreach { result => 
     result match { 
      Left(error) => //Handle error 
      Right(response) => //Handle response 
     } 
    } 
} 
2

はるかに良い方法はあり:

val promises = urls.map {u => 
    val service = url(u) 
    val promise = Http(service OK as.String).either 
} 

val listPromise = Future.sequence(promises) 

listPromise.onComplete { whatever } 
2

は、私は彼の答えにアレクセイ・ロマノフに同意します。どのようにしてあなたのhttp要求を同期させるかは、あなたが約束の完了を処理する方法に注意してください。あなたの直感は、同時アクセスが俳優の状態に現れるかもしれない点で正しいです。

def resultHandler(result: String) { 
    //on completion we are sending the result to the actor who triggered the call 
    //as a message 
    self ! HttpComplete(result) 
} 

とアクターの受信機能で

は:

def receive = { 
    //PROCESS OTHER MESSAGES HERE 
    case HttpComplete(result) => //do something with the result 
} 

この方法では、あなたがHTTPの結果を処理することは違反しないことを確認してこれを処理するためのより良い方法は、このような何かをすることですしばらく(真)、のcompareAndSet - AtomicReference CASでそれ

1
val nbOfResults = new java.util.concurrent.atomic.AtomicInteger(nbOfCalls) 

// After particular call was ended  
if (nbOfResults.decrementAndGet <= 0) { 
    // Do something 
} 

[EDIT]削除古い答えを行うための適切な方法がある外部からの、しかし、アクターの受信ループから俳優の状態、ここなど

+0

incrementAndGetの何が問題なのですか? –

+0

提案を考慮して、異形を追加しました – idonnie

関連する問題