2013-08-21 12 views
8

メッセージベースのノンブロッキングアプリケーションをakkaで作成する方法を理解していて、 同時操作を実行してメッセージに集計結果を戻す例を簡単にモックアップできます。私の難しさは、私のアプリケーションがHTTPリクエストに応答しなければならないときに、私の ノンブロッキングオプションが何であるかを理解することです。目標は、要求を受け取り、 をローカルまたはリモートの俳優にすぐに渡して、作業を行います。その結果、結果として に時間がかかることがあります。このモデルでは不幸にも、「尋ねる」をブロックするのではなく、「ブロックする」ブロックを非ブロックで表現する方法を理解できません。チェーンのどこかでtellを使用すると、私はもはや の最終的な応答コンテンツ(この場合、finagleであるが、 ではないHTTPフレームワークインターフェイスで必要です)として使用することはできません。私はリクエストが独自のスレッドであると理解しています。私の例はかなり工夫されていますが、ちょうど に私のデザインオプションを理解しようとしています。HTTP応答が要求されたときの非ブロック化オプション

要約すると、以下の私の考案した例を再加工してブロックすることができない場合、私はどのように理解するのが大好きです。これは私の akkaの最初の使用は、1年前の光の探査以来、そして私が見たすべての記事、文書、およびトークでは、 はサービスをブロックしないと言います。

概念的な回答が参考になるかもしれませんが、すでに読んでいるものと同じかもしれません。私の例である の作業/編集は、私が解決しようとしている正確な問題を理解するうえで鍵となるでしょう。現在の例が一般的に の場合は、その確認も役立ちますので、存在しない魔法は検索しません。

注次の別名:輸入com.twitter.util {フューチャー=> TwitterFuture、待つ=> TwitterAwait}提供する任意の指導を事前に

object Server { 

    val system = ActorSystem("Example-System") 

    implicit val timeout = Timeout(1 seconds) 

    implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = { 
    val promise = TwitterPromise[T] 
    scFuture onComplete { 
     case Success(result) ⇒ promise.setValue(result) 
     case Failure(failure) ⇒ promise.setException(failure) 
    } 
    promise 
    } 

    val service = new Service[HttpRequest, HttpResponse] { 
    def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { 
     case "https://stackoverflow.com/a/b/c" => 
     val w1 = system.actorOf(Props(new Worker1)) 

     val r = w1 ? "take work" 

     val response: Future[HttpResponse] = r.mapTo[String].map { c => 
      val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
      resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
      resp 
     } 

     response 
    } 
    } 

//val server = Http.serve(":8080", service); TwitterAwait.ready(server) 

    class Worker1 extends Actor with ActorLogging { 
    def receive = { 
     case "take work" => 
     val w2 = context.actorOf(Props(new Worker2)) 
     pipe (w2 ? "do work") to sender 
    } 
    } 

    class Worker2 extends Actor with ActorLogging { 
    def receive = { 
     case "do work" => 
     //Long operation... 
     sender ! "The Work" 
    } 
    } 

    def main(args: Array[String]) { 
    val r = service.apply(
     com.twitter.finagle.http.Request("https://stackoverflow.com/a/b/c") 
    ) 
    println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work 
    } 
} 

ありがとう!

pipe(w2 ? "do work") to sender 

の代わりに:あなたはWorker1にあなたが書くだろう、pipe pattern -ieを使用してメッセージとして将来を送るのを避けることができ

答えて

5

sender ! (w2 ? "do work") 

を今rFuture[String]になりますFuture[Future[String]]の代わりに


更新:上記pipeソリューションは、あなたの俳優が将来で応答を避けるための一般的な方法です。ヴィクトルは以下のコメントで指摘するように、この場合には、あなたが取ることができ、あなたのWorker1完全にそれ(Worker1)からのメッセージを得た俳優に直接応答しWorker2を伝えることにより、ループの外:

w2.tell("do work", sender) 

このウォン(mapw2 ? "do work"に、flatMapまたはfor(複数の先物を組み合わせて)などを使用して、がWorker2の応答を操作する必要がある場合はオプションとなりますが、これは必須ではありませんよりクリーンで効率的です。


これは1つを殺すAwait.resultです。あなたは、次のような何かを書くことで他のを取り除くことができます。

val response: Future[HttpResponse] = r.mapTo[String].map { c => 
    val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
    resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
    resp 
} 

は今、あなただけのTwitterFutureにこのFutureをオンにする必要があります。私は私の頭の上から正確にこれを行う方法を教えてもらえませんが、それはfairly trivialでなければならず、間違いなくブロックする必要はありません。

+0

ご連絡ありがとうございました。それは先物を清算して、少し待つ。だから、あなたは2つの尋問の使用が実際に必要であると信じています(明らかに私はそれをそのようにしか見ることができませんでしたが、確かめたいと思いました)?あなたの改善点を含めるようにコードを更新し、akka futureからtwitter futureへの暗黙的な変換を含めます。 Stack Overflowのエチケットにはまだ慣れていないので、私は改善のために+1を与えています。尋問に関する追加情報は参考になります。ありがとう! – Eric

+0

これらの俳優が何を担当しているかについてもっと知らずに尋問が必要かどうかは言い難いですが、重要なのはブロッキングを必要としないということです(余計な簿記がありますが、まだ非同期です)。私はまた、Twitterと標準ライブラリ先物の間の変換を明示的に保持することをお勧めします。変換メソッドを呼び出すことは、通常、このような場合に潜在的な混乱を避けるために支払う小さな代金です。 –

+0

このトラヴィスの助けと洞察に感謝します。これは私の懸念を完全に解決します。 – Eric

0

あなたは間違いなくここでブロックする必要はありません。それはあなたがapplyメソッドから返されますFutureのIMPLだとして、あなたがさえずりPromiseが必要になります

import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait, Promise => TwitterPromise} 

:まず、にTwitterのもののためのインポートを更新します。次に、Travis Brownが答えたところであなたの俳優がネストされた未来を持っていないように応答していることに従ってください。あなたがそれを行うならば、あなたはこのような何かにあなたのapply方法を変更することができる必要があります:

def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { 
    case "https://stackoverflow.com/a/b/c" => 
    val w1 = system.actorOf(Props(new Worker1)) 

    val r = (w1 ? "take work").mapTo[String] 
    val prom = new TwitterPromise[HttpResponse] 
    r.map(toResponse) onComplete{ 
     case Success(resp) => prom.setValue(resp) 
     case Failure(ex) => prom.setException(ex)    
    } 

    prom 
} 

def toResponse(c:String):HttpResponse = { 
    val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
    resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
    resp 
} 

これはおそらく、もう少し作業が必要です。私はIDEで設定していないので、コンパイルすることは保証できませんが、私はそのアイディアが妥当であると信じています。 applyメソッドから返される内容は、まだ完了していないTwitterFutureです。これは、俳優からの未来が(?)が尋ねられ、それがノンブロッキングのonCompleteコールバックを介して行われているときに完了します。

+0

私は、暗黙的と変更で私の答えを更新して、リフレッシュして、本質的に機能的にインラインであるあなたの答えを見た。時間をとっていただきありがとうございます。私は両方のあなたの答えによって、質問が実際には大丈夫だと仮定します。それはあなたの方法かトラビスが提案した方法のいずれかによって待っていることを殺すための運動でした。再度、感謝します。 – Eric

+0

将来の変換をこのようなレスポンスビルディングロジックでロールアップすることは、私にとっては理想的ではないようです。特に、変換が他の場所でも必要である場合はそうです。将来のマッピングと変換の上にこのアプローチを提案する理由はありますか? –

+0

@TravisBrown、例を簡単にするため、 'onComplete'を実行する前に変換する最初の' map'を秒で更新します。これはあなたが正しいことを話していたことですか? – cmbaxter

関連する問題