2011-08-09 14 views
2

私は今、MailboxProcessorで遊んでいます。したがって、私は、コンピュータ上のディレクトリをクロールすることができますいくつかの薬、およびすべてのサブディレクトリを作っ - その後、各ディレクトリ内のファイルを印刷:MailboxProcessor - いつ停止するかを教えてください。

let fileCollector = 
    MailboxProcessor.Start(fun self -> 
    let rec loop() = 
     async { let! file = self.Receive() 
       printfn "%s" file 
       return! loop() } 
    loop()) 

let folderCollector = 
    MailboxProcessor.Start(fun self -> 
    let rec loop() = 
     async { let! dir = self.Receive() 
       do! Async.StartChild(
        async { let! files = Directory.AsyncGetFiles dir 
          for z in files do fileCollector.Post z }) |> Async.Ignore 
       return! loop() } 
    loop()) 

let crawler = 
    MailboxProcessor.Start(fun self -> 
    let rec loop() = 
     async { let! dir = self.Receive() 
       folderCollector.Post dir 
       do! Async.StartChild(
        async { let! dirs = Directory.AsyncGetDirectories dir 
          for z in dirs do self.Post z }) |> Async.Ignore 
       return! loop() } 
    loop()) 

crawler.Post @"C:\Projects" 

printfn "Done" // Message getting fired right away, due to the async stuff. 

folderCollectorfileCollectorcrawlerが行われたときに、私は言うだろうか、最後にprintfnステートメントが呼び出され、クローラーがすべてのサブディレクトリーを正常にクロールしてすべてのファイルを印刷した後に呼び出されます。 http://tomasp.net/blog/parallel-extra-image-pipeline.aspxにトマスPetricekによって技術showenを使用することにより 、私は次のコード作ることができた:

更新

let folders = new BlockingQueueAgent<string>(100) 
let files = new BlockingQueueAgent<string>(100) 

let rec folderCollector path = 
    async { do! folders.AsyncAdd(path) 
      do! Async.StartChild(
        async { let! dirs = Directory.AsyncGetDirectories path 
          for z in dirs do 
          do! folderCollector z }) |> Async.Ignore } 

let fileCollector = 
    async { while true do 
      let! dir = folders.AsyncGet() 
      do! Async.StartChild(
        async { let! fs = Directory.AsyncGetFiles dir 
          for z in fs do 
           do! files.AsyncAdd z }) |> Async.Ignore } 

let rec printFiles() = 
    async { let! file = files.AsyncTryGet(75) 
      match file with 
      | Some s -> 
      printfn "%s" s 
      return! displayFiles() 
      | None ->() } 

let cts = new CancellationTokenSource() 
Async.Start(folderCollector @"C:\Projects", cts.Token) 
Async.Start(fileCollector, cts.Token) 
Async.RunSynchronously(printFiles(), cancellationToken = cts.Token) 

printfn "DONE!" 

更新:更新:さて、私

let folders = new BlockingQueueAgent<string option>(10) 
let files = new BlockingQueueAgent<string option>(10) 

let folderCollector path = 
    async { let rec loop path = 
      async { do! folders.AsyncAdd(Some path) 
        let! dirs = Directory.AsyncGetDirectories path 
        do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore } 
      do! loop path 
      do! folders.AsyncAdd(None) } 

let rec fileCollector() = 
    async { let! dir = folders.AsyncGet 125 
      match dir with 
      | Some s -> 
      let fs = Directory.GetFiles s 
      do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ] |> Async.Parallel |> Async.Ignore // <-- Fails silence if files are full 
      do! fileCollector() // <-- unreachable 
      | None -> printfn "Done!";()} 

そのトイレ:コード以下の混ざっ」VEのksええ、ええ?なんらかの理由で、fileCollector()関数のdo! fileCollector()行で、files BlockingQueueAgentがいっぱいの場合、 を実行しないでください。代わりに、それは沈黙に失敗します。私がしなければ

は、しかし:

let folderCollector path = 
    async { let rec loop path = 
      async { do! folders.AsyncAdd(Some path) 
        let! dirs = Directory.AsyncGetDirectories path 
        do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore } 
      do! loop path 
      do! folders.AsyncAdd(None) } 

let rec fileCollector() = 
    async { let! dir = folders.AsyncGet 75 
      match dir with 
      | Some s -> 
      let fs = Directory.GetFiles s 
      do! Async.StartChild(async { do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ] 
              |> Async.Parallel |> Async.Ignore }) |> Async.Ignore 
      do! fileCollector() 
      | None -> printfn "Done!";()} 

それだけで正常に動作します。しかし、今はfileCollectorが完了した時点を追跡することはできません。なぜなら、それは非同期計算の束を実行しているからです。したがって、キュー内の「なし」になっても、やる必要があるかもしれません。どうしたの?


更新: 私はfolderCollectorと同じ "スタイル" にfileCollectorを変更したが、問題が残っています。修正されたバージョン:

let fileCollector() = 
    async { let rec loop() = 
      async { let! dir = folders.AsyncGet 750 
        match dir with 
        | Some s -> 
         let! fs = Directory.AsyncGetFiles s 
         do! [ for z in fs -> printfn "%A" z; files.AsyncAdd(Some z) ] 
          |> Async.Parallel |> Async.Ignore 
         return! loop() 
        | None -> printfn "Done!";() } 
      do! loop() 
      printfn "after" // Never gets this far... 
      do! files.AsyncAdd(None) } 

答えて

3

- 私はあなたがBlockingQueueAgent<option<string>>を使用して、すべてのファイルを生成し終えたときに値Noneを使用することができると思います(None値はその後、パイプラインを介して伝播するだろうし、彼らはNoneを得るときすべてのワークフローを終了することができます)。

はそれを行うには、あなたはそれが反復を終了すると、実際に検出するfolderCollectorを変更する必要があります。 (ポイントは、再帰呼び出しの完了を待つ必要があるということです)テストされていないが、以下は動作するはずです:

let rec folderCollector path = 
    let rec loop path = 
    async { do! folders.AsyncAdd(Some path) 
      let! dirs = Directory.AsyncGetDirectories path 
      do! [ for z in dirs do -> folderCollector z ] 
       |> Async.Parallel |> Async.Ignore } 
    async { do! loop path 
      do! folders.AsyncAdd(None) } 

すべてのワークフローは、潜在的にAsyncGetの結果Noneになるだろう。その場合、パイプライン内の次のワーカーにNoneを送信する必要があります。最後の一つは、それがNone受信したときに終了することができます:)迅速な回答に

let rec printFiles() = 
    async { let! file = files.AsyncGet(75) // Note - now we use just AsyncGet 
      match file with 
      | Some s -> 
      printfn "%s" s 
      return! displayFiles() 
      | None ->() } // Completed processing all files 
+0

私の投稿更新を見てください! :)私はそれが私のものかどうかは分かりませんが、 'BlockingQueueAgent'にバグがあります(私はあなたのブログの投稿と同じコードをエージェントのタイプとして使っています)。 – ebb

+1

@ebb - 'fileCollector'を' folderCollector'と同様の方法で修正する必要があると思います(両者が終わりを追うことができるように)。 Aslo、私はfolderCollector' 'と同様の方法で、' filCollector'を変更しようとしました(更新記事を参照) –

+0

(それがより効率的です)、 '!代わりに'やるの再帰呼び出しのために '!'返す使用していますが、問題残っている...私は完全にシンプルなものを見落とさなければならないが、何が見えないのか。 – ebb

2

F#のエージェントが完了したときにあなたを通知するための組み込みのサポートはありません。実際には伝えるのはかなり難しいです。エージェントは、空のキューがあっても、他のエージェントからのメッセージを引き続き受信して再び作業を開始できるため、完了していません。

この例では、3つのエージェントすべてのキューが空の場合に作業が実行されます。これはCurrentQueueLengthを使用して確認できます。これは非常に素晴らしいソリューションではありませんが、それは動作します:

crawler.Post @"C:\Temp" 
// Busy waiting until all queues are empty 
while crawler.CurrentQueueLength <> 0 || folderCollector.CurrentQueueLength <> 0 || 
     fileCollector.CurrentQueueLength <> 0 do 
    System.Threading.Thread.Sleep(10) 
printfn "Done" 

私はより良いアプローチが異なってコードを構築することだと思う - あなたが本当に再帰的にディレクトリツリーを処理するための薬剤を使用する必要はありません。あなたのバージョンでは、ディレクトリ(crawlerエージェント)の歩行は、フォルダ(folderCollector)内のファイルを検索し、結果(fileCollector)を処理することと並行して行われるので、基本的に3段階のパイプラインを実装しています。

ただちにasyncを使用すると、処理の即時結果を格納するために使用されるブロッキングキューを使用して、パイプラインをより簡単に実装できます。これはarticle shows an example with image processingです。私は同じアプローチがあなたのためにも働くと思います。すべての入力を送信した後、完了を示す特別なメッセージを送信し、メッセージがパイプラインの最後に到達したら、完了している)。

もう1つの方法は、この種の問題の良いパターンであるかもしれないasynchronous sequencesを使用することです(ただし、現時点ではオンラインで良いサンプルがありません)。パイプラインに基づいて更新されたバージョンについて(コメントから)あなたの2番目の質問に答えるために

+0

感謝を - 私は実際にそれ以前の「画像処理」についてのあなたの記事を見てなかったが、 'BlockingQueueAgent'は' MAXLENGTHを取ったので、 'パラメータとして、私は単にファイル/フォルダの数を事前に知っていないので、単にそれをスキップしました。しかし、それも回避策があるように聞こえますか? – ebb

+0

@ebb - 'maxLength'引数は' Int32.MaxValue'に設定できますが、実際にはそうしたくないでしょう。アイテムの数が最大数に達すると、キューはアイテムをキューに追加し続けるワークフローをブロックします(アイテムがさらに処理されるまで)。これは、パイプライン全体でデータを十分に高速に処理できない場合に、キューのオーバーフィルを避けるために使用されます。 –

+0

@ebb - ...ファイル処理のサンプルでは、​​おそらくディスクに_all files_というキューを作成したくないでしょう。代わりに、あなたはいくつかのファイル数(例えば100)をキューに入れ、他のプロセスがカウントが保留キュー、いやああ –

関連する問題