2013-07-16 14 views
5

先物を組み合わせるには良いヒントがありますwith timeouts。 私はスカラの将来のシーケンスとタイムアウト処理

私の最初のアプローチは、この

import scala.concurrent._ 
import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext.Implicits._ 

object FutureSequenceScala extends App { 
    println("Creating futureList") 

    val timeout = 2 seconds 
    val futures = List(1000, 1500, 1200, 800, 2000) map { ms => 
    val f = future { 
     Thread sleep ms 
     ms toString 
    } 
    Future firstCompletedOf Seq(f, fallback(timeout)) 
    } 

    println("Creating waitinglist") 
    val waitingList = Future sequence futures 
    println("Created") 

    val results = Await result (waitingList, timeout * futures.size) 
    println(results) 

    def fallback(timeout: Duration) = future { 
    Thread sleep (timeout toMillis) 
    "-1" 
    } 
} 

のように見える今後のシーケンスsequenceOfFuturesでこれを行うにはどのように興味が先物の順序でタイムアウトを処理するためのより良い方法があるか、このaは有効なソリューションですか?

答えて

8

あなたのコードに再考したいことがいくつかあります。まず、タイムアウトをシミュレートする唯一の目的を持っているExecutionContextにタスクを提出し、Thread.sleepを使用したことについては、私は大ファンではありません。 sleep呼び出しがブロックされているため、一定時間待機するために完全にブロックされているタスクを実行コンテキスト内に置かないでください。私は私の答えhereから盗むつもりで、純粋なタイムアウト処理のために、私はその答えに概説されているようなものを使うべきだと提案します。 HashedWheelTimerは、非常に効率的なタイマー実装で、スリープ状態のタスクよりタイムアウト処理に適しています。

あなたがそのルートに行くならば、私が提案する次の変更は、将来の個々のタイムアウトに関連する障害を処理することに関係します。個々の失敗がsequenceコールから返された合計Futureを完全に失敗させたい場合は、何もしないでください。あなたは、あなたが取ることができる、ということをやった後は

withTimeout(someFuture).recover{ 
    case ex:TimeoutException => someDefaultValue 
} 

:あなたはそれが起こると、その代わり、タイムアウトではなく、いくつかのデフォルト値を返すようにしたいしたくない場合は、このようFuturerecoverを使用することができます非ブロックのコールバックの利点と、このような何か:

waitingList onComplete{ 
    case Success(results) => //handle success 
    case Failure(ex) => //handle fail 
} 

各未来はタイムアウトを持っているので、ちょうど無限に実行されませんが。そこでブロックするIMOは必要ありません。また、atMostパラメータでAwait.resultにタイムアウト処理の追加層を提供する必要はありません。しかし、私はあなたがノンブロッキングのアプローチで大丈夫だと仮定していると思います。あなたが本当にブロックする必要がある場合は、timeout * futures.size時間を待つべきではありません。これらの先物は並行して実行されています。タイムアウトは、先物自体の個々のタイムアウト(またはCPU /タイミングの遅れを考慮するにはちょっとだけ長い)と同じくらい長くする必要があります。確かにタイムアウト*先物の総数であってはなりません。

+0

好奇心として、 'HasherWheelTimer'は' TimerTask'や 'newScheduledThreadPoolExecutor'よりどのように効率的ですか?どちらも同じ仕事をしています。 – Jatin

+0

@Jatin、私はあなたがこの情報をもっと詳しく知ることができると思う:http://stackoverflow.com/questions/15347600/which-is-more-efficient-nettys-hashedwheeltimer-or-quartzs-scheduler。しかし、それが心から、より多くのタスクを追加することは、より多くのリソースを消費すべきではありません。 'Timer'や' TimerTask'のようなもので、より一定の時間(消費されるシステムリソースの観点から)に基づくタイマーになっています。短時間のタイムアウトに基づくタスクをたくさんスケジューリングする高スループットシステムでは、リソース使用量が一定のため、より良い解決策です。 – cmbaxter

+0

しかし、 '' hashedWheelTimer'と比較して、 '' coresize'' 1''で 'STPE'はもっと多くのリソースを消費しますか?申し訳ありませんが、私はそれを手に入れません。 'STPE'は内部ヒープ' O(log(n)) 'によって挿入時間が長くなりますが、ティックタイムは小さくなります。 – Jatin

1

ここでは、ブロックfallbackがどの程度悪いかを示すバージョンがあります。

executorはシングルスレッドであり、多くのフォールバックを作成していることに注意してください。

@cmbaxterが正しく、マスタータイムアウトはtimeout * futures.sizeではありません。

@cmbaxterでもノンブロッキングと思っています。一度それをしてタイムアウトをかけたいなら、タイマーコンポーネントを選び、彼のリンクされた答え(リンクされた答えからリンクされている)を見てください。

私はまだmy answer from your linkが好きです。これは、ループが次のことを待っている限り、タイムアウトは本当に簡単です。

これは、先物とそのタイムアウトとフォールバック値のリストを取るだけです。

はたぶん、このようなシンプルなアプリとしてそのためのユースケースがあることだけで(あなたのテストのような)いくつかの結果を得るためにブロックし、終了しない結果がでている必要があります前に、何が起こった

import scala.concurrent._ 
import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext 

import java.util.concurrent.Executors 
import java.lang.System.{ nanoTime => now } 

object Test extends App { 
    //implicit val xc = ExecutionContext.global 
    implicit val xc = ExecutionContext fromExecutorService (Executors.newSingleThreadExecutor) 

    def timed[A](body: =>A): A = { 
    val start = now 
    val res = body 
    val end = now 
    Console println (Duration fromNanos end-start).toMillis + " " + res 
    res 
    } 
    println("Creating futureList") 

    val timeout = 1500 millis 
    val futures = List(1000, 1500, 1200, 800, 2000) map { ms => 
    val f = future { 
     timed { 
     blocking(Thread sleep ms) 
     ms toString 
     } 
    } 
    Future firstCompletedOf Seq(f, fallback(timeout)) 
    } 

    println("Creating waitinglist") 
    val waitingList = Future sequence futures 
    println("Created") 

    timed { 
    val results = Await result (waitingList, 2 * timeout * futures.size) 
    println(results) 
    }  
    xc.shutdown 

    def fallback(timeout: Duration) = future { 
    timed { 
     blocking(Thread sleep (timeout toMillis)) 
     "-1" 
    } 
    } 
} 

:。

Creating futureList 
Creating waitinglist 
Created 
1001 1000 
1500 -1 
1500 1500 
1500 -1 
1200 1200 
1500 -1 
800 800 
1500 -1 
2000 2000 
1500 -1 
List(1000, 1500, 1200, 800, 2000) 
14007()