メッセージベースのノンブロッキングアプリケーションをakkaで作成する方法を理解していて、 同時操作を実行してメッセージに集計結果を戻す例を簡単にモックアップできます。私の難しさは、私のアプリケーションがHTTPリクエストに応答しなければならないときに、私の ノンブロッキングオプションが何であるかを理解することです。目標は、要求を受け取り、 をローカルまたはリモートの俳優にすぐに渡して、作業を行います。その結果、結果として に時間がかかることがあります。このモデルでは不幸にも、「尋ねる」をブロックするのではなく、「ブロックする」ブロックを非ブロックで表現する方法を理解できません。チェーンのどこかでtellを使用すると、私はもはや の最終的な応答コンテンツ(この場合、finagleであるが、 ではないHTTPフレームワークインターフェイスで必要です)として使用することはできません。私はリクエストが独自のスレッドであると理解しています。私の例はかなり工夫されていますが、ちょうど に私のデザインオプションを理解しようとしています。HTTP応答が要求されたときの非ブロック化オプション
要約すると、以下の私の考案した例を再加工してブロックすることができない場合、私はどのように理解するのが大好きです。これは私の akkaの最初の使用は、1年前の光の探査以来、そして私が見たすべての記事、文書、およびトークでは、 はサービスをブロックしないと言います。
概念的な回答が参考になるかもしれませんが、すでに読んでいるものと同じかもしれません。私の例である の作業/編集は、私が解決しようとしている正確な問題を理解するうえで鍵となるでしょう。現在の例が一般的に の場合は、その確認も役立ちますので、存在しない魔法は検索しません。
注次の別名:輸入com.twitter.util {フューチャー=> TwitterFuture、待つ=> TwitterAwait}提供する任意の指導を事前に
object Server {
val system = ActorSystem("Example-System")
implicit val timeout = Timeout(1 seconds)
implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = {
val promise = TwitterPromise[T]
scFuture onComplete {
case Success(result) ⇒ promise.setValue(result)
case Failure(failure) ⇒ promise.setException(failure)
}
promise
}
val service = new Service[HttpRequest, HttpResponse] {
def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match {
case "https://stackoverflow.com/a/b/c" =>
val w1 = system.actorOf(Props(new Worker1))
val r = w1 ? "take work"
val response: Future[HttpResponse] = r.mapTo[String].map { c =>
val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
resp
}
response
}
}
//val server = Http.serve(":8080", service); TwitterAwait.ready(server)
class Worker1 extends Actor with ActorLogging {
def receive = {
case "take work" =>
val w2 = context.actorOf(Props(new Worker2))
pipe (w2 ? "do work") to sender
}
}
class Worker2 extends Actor with ActorLogging {
def receive = {
case "do work" =>
//Long operation...
sender ! "The Work"
}
}
def main(args: Array[String]) {
val r = service.apply(
com.twitter.finagle.http.Request("https://stackoverflow.com/a/b/c")
)
println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work
}
}
ありがとう!
pipe(w2 ? "do work") to sender
の代わりに:あなたはWorker1
にあなたが書くだろう、pipe pattern -ieを使用してメッセージとして将来を送るのを避けることができ
ご連絡ありがとうございました。それは先物を清算して、少し待つ。だから、あなたは2つの尋問の使用が実際に必要であると信じています(明らかに私はそれをそのようにしか見ることができませんでしたが、確かめたいと思いました)?あなたの改善点を含めるようにコードを更新し、akka futureからtwitter futureへの暗黙的な変換を含めます。 Stack Overflowのエチケットにはまだ慣れていないので、私は改善のために+1を与えています。尋問に関する追加情報は参考になります。ありがとう! – Eric
これらの俳優が何を担当しているかについてもっと知らずに尋問が必要かどうかは言い難いですが、重要なのはブロッキングを必要としないということです(余計な簿記がありますが、まだ非同期です)。私はまた、Twitterと標準ライブラリ先物の間の変換を明示的に保持することをお勧めします。変換メソッドを呼び出すことは、通常、このような場合に潜在的な混乱を避けるために支払う小さな代金です。 –
このトラヴィスの助けと洞察に感謝します。これは私の懸念を完全に解決します。 – Eric