2011-02-02 6 views
4

TryScanの使い方の例を見つけようとしていましたが、何も見つかりませんでした。TryScanをF#で正しく使う方法

私は何をしたいですか(簡略化した例):MailboxProcessorには の2種類のメッシュがあります。

  • 最初の1つはGetStateが現在の状態を返します。 GetStateメッセージが頻繁に送信されます。

  • 他のUpdateStateは非常に高価(時間がかかる)です。インターネットから何かをダウンロードし、それに応じて状態を更新する。 UpdateStateはまれにしか呼び出されません。

私の問題はある - メッセージGetStateがブロックされ、UpdateStateを提供しています直前まで待っています。だから、私はTryScanを使ってすべてGetStateメッセージを処理しようとしましたが、運がありませんでした。

私のコード例:

type Msg = GetState of AsyncReplyChannel<int> | UpdateState 
let mbox = MailboxProcessor.Start(fun mbox -> 
      let rec loop state = async { 
       // this TryScan doesn't work as expected 
       // it should process GetState messages and then continue 
       mbox.TryScan(fun m -> 
        match m with 
        | GetState(chnl) -> 
         printfn "G processing TryScan" 
         chnl.Reply(state) 
         Some(async { return! loop state}) 
        | _ -> None 
       ) |> ignore 

       let! msg = mbox.Receive() 
       match msg with 
       | UpdateState -> 
        printfn "U processing" 
        // something very time consuming here... 
        async { do! Async.Sleep(1000) } |> Async.RunSynchronously 
        return! loop (state+1) 
       | GetState(chnl) -> 
        printfn "G processing" 
        chnl.Reply(state) 
        return! loop state 
      } 
      loop 0 
) 

[async { for i in 1..10 do 
      printfn " U" 
      mbox.Post(UpdateState) 
      async { do! Async.Sleep(200) } |> Async.RunSynchronously 
}; 
async { // wait some time so that several `UpdateState` messages are fired 
     async { do! Async.Sleep(500) } |> Async.RunSynchronously 
     for i in 1..20 do 
      printfn "G" 
      printfn "%d" (mbox.PostAndReply(GetState)) 
}] |> Async.Parallel |> Async.RunSynchronously 

あなたがコードを実行しようとした場合、あなたはそれがその結果を待つためGetStateメッセージはほとんど、処理されないことを、表示されます。一方、UpdateStateは火災と忘れのため、効果的に状態を取得できません。私の作品

編集

現在のソリューションは、このいずれかになります。コメントへの

type Msg = GetState of AsyncReplyChannel<int> | UpdateState 
let mbox = MailboxProcessor.Start(fun mbox -> 
      let rec loop state = async { 
       // this TryScan doesn't work as expected 
       // it should process GetState messages and then continue 
       let! res = mbox.TryScan((function 
        | GetState(chnl) -> Some(async { 
          chnl.Reply(state) 
          return state 
         }) 
        | _ -> None 
       ), 5) 

       match res with 
       | None -> 
        let! msg = mbox.Receive() 
        match msg with 
         | UpdateState -> 
          async { do! Async.Sleep(1000) } |> Async.RunSynchronously 
          return! loop (state+1) 
         | _ -> return! loop state 
       | Some n -> return! loop n 
      } 
      loop 0 
) 

反応:並列にUpdateStateを実行し、他のMailboxProcessorまたはThreadPoolとのアイデアは素晴らしいです、私はそれを現在必要としません。 私がしたかったのは、すべてGetStateのメッセージを処理し、その後は他のメッセージを処理することでした。私は処理中にUpdateStateエージェントがブロックされていることに気にしません。

私は出力に問題が何であったかを紹介します:

// GetState messages are delayed 500 ms - see do! Async.Sleep(500) 
// each UpdateState is sent after 200ms 
// each GetState is sent immediatelly! (not real example, but illustrates the problem) 
U   200ms <-- issue UpdateState 
U processing   <-- process UpdateState, it takes 1sec, so other 
U   200ms  5 requests are sent; sent means, that it is 
U   200ms  fire-and-forget message - it doesn't wait for any result 
          and therefore it can send every 200ms one UpdateState message 
G      <-- first GetState sent, but waiting for reply - so all 
          previous UpdateState messages have to be processed! = 3 seconds 
          and AFTER all the UpdateState messages are processed, result 
          is returned and new GetState can be sent. 
U   200ms 
U   200ms  because each UpdateState takes 1 second 
U   200ms 
U processing 
U 
U 
U 
U 
U processing 
G processing   <-- now first GetState is processed! so late? uh.. 
U processing   <-- takes 1sec 
3 
G 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
G processing   <-- after MANY seconds, second GetState is processed! 
10 
G 
G processing 
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent 
+0

'TryScan'コールの後に' |> ignore'を付けても、あなたがAPIを間違って使っているという事実を警告しなければなりません。 (今、完全な答えのための時間がない、誰かが私にそれを打つことを願って) – Brian

+0

私は間違った方法でそれを使用することを知っています。しかし、私はそれをまったく使用することについての投稿は見つかりませんでした。 – stej

+0

「TryScan」と「Scan」のポイントは、メッセージが受信されない場合はタイムアウトを待つことです。この2つの唯一の違いは、タイムアウト時にTryScanがオプションを返し、Scanが例外を発生させることです。 – gradbot

答えて

4

私はTryScan方法は、このシナリオであなたを助けることはないと思います。メッセージを待つ間に使用されるタイムアウトを指定することができます。メッセージが受信されると、メッセージの処理が開始されます(タイムアウトは無視されます)。例えば

、あなたがチェックし、いくつかの他のいくつかの特定のメッセージを待つが、実行したい場合は毎秒に書くことができ(待っている間に):

let loop() = async { 
    let! res = mbox.TryScan(function 
    | ImportantMessage -> Some(async { 
      // process message 
      return 0 
     }) 
    | _ -> None) 
    match res with 
    | None -> 
     // perform some check & continue waiting 
     return! loop() 
    | Some n -> 
     // ImportantMessage was received and processed 
} 

を使用すると、メールボックスのプロセッサ際のブロッキングを避けるために何ができますかUpdateStateメッセージを処理していますか?メールボックスプロセッサは(論理的に)シングルスレッドです。おそらくUpdateStateメッセージの処理をキャンセルしたくないので、バックグラウンドで処理を開始し、処理が完了するまで待つことをお勧めします。UpdateStateを処理するコードは、メールボックスに何らかのメッセージを返すことができます(例:UpdateStateCompleted)。

let rec loop (state) = async { 
    let! msg = mbox.Receive() 
    match msg with 
    | GetState(repl) -> 
     repl.Reply(state) 
     return! scanning state 
    | UpdateState -> 
     async { 
     // complex calculation (runs in parallel) 
     mbox.Post(UpdateStateCompleted newState) } 
     |> Async.Start 
    | UpdateStateCompleted newState -> 
     // Received new state from background workflow 
     return! loop newState } 

を今すぐバックグラウンドタスクを並列に実行されている、あなたが変更可能な状態に注意する必要があること:ここで

が、これがどのように見えるかのスケッチです。また、メッセージを処理できる速度よりも速くUpdateStateメッセージを送信すると、問題が発生します。これは、たとえば、以前のリクエストをすでに処理しているときにリクエストを無視またはキューイングするなどの方法で修正できます。

+0

Thx、Tomáš。 'TryScan'をタイムアウトで使用しようとしますが、問題が起きた場合に備えて、他の' MailboxProcessor'を使用する可能性があります。 – stej

+0

@stej「UpdateState」メッセージが既に開始されている場合、タイムアウトはUpdateStateメッセージの処理をキャンセルしないことに注意してください(メッセージの処理中は、他のメッセージを並行して処理できません)。 –

+0

はい、わかりました。私は 'UpdateState'をキャンセルしたくありません。実際には、タイムアウトは必要ありません。キュー内のすべての 'GetState'メッセージを処理し、そのプロセスの後に他の(現在は' UpdateState'のみの)メッセージを処理するだけです。私は少し質問を更新します。 – stej

2

TomasはMailboxProcessorについて言及しているので、シングルスレッドです。状態ゲッターから別のスレッドで更新を実行するには別のMailboxProcessorが必要です。

#nowarn "40" 

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | UpdateState 

let runner_UpdateState = MailboxProcessor.Start(fun mbox -> 
    let rec loop = async { 
     let! state = mbox.Receive() 
     printfn "U start processing %d" !state 
     // something very time consuming here... 
     do! Async.Sleep 100 
     printfn "U done processing %d" !state 
     state := !state + 1 
     do! loop 
    } 
    loop 
) 

let mbox = MailboxProcessor.Start(fun mbox -> 
    // we need a mutiple state if another thread can change it at any time 
    let state = ref 0 

    let rec loop = async { 
     let! msg = mbox.Receive() 

     match msg with 
     | UpdateState -> runner_UpdateState.Post state 
     | GetState chnl -> chnl.Reply !state 

     return! loop 
    } 
    loop) 

[ 
    async { 
     for i in 1..10 do 
      mbox.Post UpdateState 
      do! Async.Sleep 200 
    }; 
    async { 
     // wait some time so that several `UpdateState` messages are fired 
     do! Async.Sleep 1000 

     for i in 1..20 do 
      printfn "G %d" (mbox.PostAndReply GetState) 
      do! Async.Sleep 50 
    } 
] 
|> Async.Parallel 
|> Async.RunSynchronously 
|> ignore 

System.Console.ReadLine() |> ignore 

出力:

U start processing 0 
U done processing 0 
U start processing 1 
U done processing 1 
U start processing 2 
U done processing 2 
U start processing 3 
U done processing 3 
U start processing 4 
U done processing 4 
G 5 
U start processing 5 
G 5 
U done processing 5 
G 5 
G 6 
U start processing 6 
G 6 
G 6 
U done processing 6 
G 7 
U start processing 7 
G 7 
G 7 
U done processing 7 
G 8 
G U start processing 8 
8 
G 8 
U done processing 8 
G 9 
G 9 
U start processing 9 
G 9 
U done processing 9 
G 9 
G 10 
G 10 
G 10 
G 10 

あなたはまたのThreadPoolを使用することができます。

open System.Threading 

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | SetState of int 
    | UpdateState 

let mbox = MailboxProcessor.Start(fun mbox -> 
    let rec loop state = async { 
     let! msg = mbox.Receive() 

     match msg with 
     | UpdateState -> 
      ThreadPool.QueueUserWorkItem((fun obj -> 
       let state = obj :?> int 

       printfn "U start processing %d" state 
       Async.Sleep 100 |> Async.RunSynchronously 
       printfn "U done processing %d" state 
       mbox.Post(SetState(state + 1)) 

       ), state) 
      |> ignore 
     | GetState chnl -> 
      chnl.Reply state 
     | SetState newState -> 
      return! loop newState 
     return! loop state 
    } 
    loop 0) 

[ 
    async { 
     for i in 1..10 do 
      mbox.Post UpdateState 
      do! Async.Sleep 200 
    }; 
    async { 
     // wait some time so that several `UpdateState` messages are fired 
     do! Async.Sleep 1000 

     for i in 1..20 do 
      printfn "G %d" (mbox.PostAndReply GetState) 
      do! Async.Sleep 50 
    } 
] 
|> Async.Parallel 
|> Async.RunSynchronously 
|> ignore 

System.Console.ReadLine()|>

+0

( 'state:=!state + 1')の演算子 '!'はアトミックですか?私は 'MailboxProcessor'を使って可変変数を避けています。この場合、私は困っていると思います。スレッドプールはうまく、thx。 – stej

+0

原子ではありません。不変の状態が必要な場合は、余分なSetStateとThreadPoolを使用します。 – gradbot

3

がTRYSCANを使用しないでください無視!

残念ながら、現在のバージョンのF#ではTryScanの機能が2つの方法で壊れています。まず、タイムアウトを指定するのがポイントですが、実装は実際にはそれを尊重しません。具体的には、無関係なメッセージがタイマーをリセットします。第2に、他のScan関数の場合と同様に、メッセージキューはスキャン中に他のスレッドがポスティングするのを防ぐロックの下で検査されます。これは、任意の長い時間になります。したがって、TryScan関数自体は、同時システムをロックする傾向があり、呼び出し側のコードがロック内で評価されるため、デッドロックが発生する可能性もあります(たとえば、関数引数からへのポストまたはTryScanへのポストは、すでにロックされているロックを取得する)。

私の生産コードの初期プロトタイプでTryScanを使用しましたが、問題は終了しませんでした。しかし、私はそれを中心にアーキテクトを行い、結果として得られたアーキテクチャは実際より優れていました。本質的には、私は熱心にReceiveすべてのメッセージとフィルタを自分のローカルキューを使用しています。

+0

かなり面白いです。しかし、正直言って、私はキューがターゲットメッセージを探している間にロックされることを期待していました。とにかく、それが本当であることとタイムアウトが奇妙な方法で動作するかもしれないことを知るためにgoot。ローカルキュー(最後の段落で言及したもの)を使用するコードを表示することができれば、それは素晴らしいことでしょう。 – stej

+1

私は、メールボックス(およびTPL)の実装が待ち状態でないことを期待しています。私は私の答えにコードを追加します... –

関連する問題