2012-02-13 5 views
3

Async CTPを使用してHTMLクローラを書き込もうとしていますが、これを達成するための再帰的な方法はありません。Async CTPを使用してHTMLページを同時にダウンロード

これはこれまでのコードです。

private readonly ConcurrentStack<LinkItem> _LinkStack; 
private readonly Int32 _MaxStackSize; 
private readonly WebClient client = new WebClient(); 

Func<string, string, Task<List<LinkItem>>> DownloadFromLink = async (BaseURL, uri) => 
{ 
    string html = await client.DownloadStringTaskAsync(uri); 
    return LinkFinder.Find(html, BaseURL); 
}; 

Action<LinkItem> DownloadAndPush = async (o) => 
{ 
    List<LinkItem> result = await DownloadFromLink(o.BaseURL, o.Href); 
    if (this._LinkStack.Count() + result.Count <= this._MaxStackSize) 
    { 
     this._LinkStack.PushRange(result.ToArray()); 
     o.Processed = true; 
    } 
}; 

Parallel.ForEach(this._LinkStack, (o) => 
{ 
    DownloadAndPush(o); 
}); 

しかし、私は、私は唯一の唯一の1つのアイテムを持っているParallel.ForEachは、最初の(そして唯一の繰り返し)を実行するため、一度に望んでいるだろうと明らかにこれは動作しません。 ForEachを再帰的にするために私が考えることができる最も単純なアプローチですが、私はすぐにスタック領域を使い果たしてしまいます。

MaxStackSizeに達するか、システムがメモリ不足になるまで項目を追加する再帰的継続として記述するものを作成するために、このコードをどのように再構成できるか教えてください。

+0

+1。再帰を制御する者は、宇宙を制御する! – toddmo

答えて

10

私はC#5/.Net 4.5を使用してこのようなことを行う最善の方法はTPL Dataflowを使用することだと思います。さらにa walkthrough on how to implement web crawler using itがあります。あなたが望む任意の値にMaxDegreeOfParallelismを設定することができます

var cts = new CancellationTokenSource(); 

Func<LinkItem, Task<IEnumerable<LinkItem>>> downloadFromLink = 
    async link => 
      { 
       // WebClient is not guaranteed to be thread-safe, 
       // so we shouldn't use one shared instance 
       var client = new WebClient(); 
       string html = await client.DownloadStringTaskAsync(link.Href); 

       return LinkFinder.Find(html, link.BaseURL); 
      }; 

var linkFinderBlock = new TransformManyBlock<LinkItem, LinkItem>(
    downloadFromLink, 
    new ExecutionDataflowBlockOptions 
    { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token }); 

基本的には、1つのURLをダウンロードして、そこからリンクを得ることの面倒を一つの「ブロック」を作成します。多くのURLを同時にダウンロードすることができます。これをまったく制限したくない場合は、DataflowBlockOptions.Unboundedに設定することができます。

次に、ダウンロードしたすべてのリンクを何らかの方法で処理する1つのブロックを作成します。すべてのリンクをすべてリストに保存するようにします。また、ダウンロードをキャンセルするタイミングを決定することができます

var links = new List<LinkItem>(); 

var storeBlock = new ActionBlock<LinkItem>(
    linkItem => 
    { 
     links.Add(linkItem); 
     if (links.Count == maxSize) 
      cts.Cancel(); 
    }); 

我々はスレッドセーフではないされ、ここで大丈夫でなければなりませんコレクションを使用することを意味1にMaxDegreeOfParallelismがデフォルトに設定されていなかったので。

ブロックをもう1つ作成します。linkFinderBlockからリンクを取得し、storeBlockに戻し、linkFinderBlockに戻します。

var broadcastBlock = new BroadcastBlock<LinkItem>(li => li); 

そのコンストラクタ内のラムダは「クローニング関数」です。必要に応じてアイテムのクローンを作成することができますが、作成後にLinkItemを変更しないので、ここでは必要ありません。

今、私たちは一緒にブロックを接続することができます(あなたもstoreBlockにそれを送信する場合、またはbroadcastBlock

linkFinderBlock.LinkTo(broadcastBlock); 
broadcastBlock.LinkTo(storeBlock); 
broadcastBlock.LinkTo(linkFinderBlock); 

その後、我々はlinkFinderBlockに最初のアイテムを与えることによって処理を開始することができます

linkFinderBlock.Post(firstItem); 

そして最後に、処理が完了するまで待つ:

try 
{ 
    linkFinderBlock.Completion.Wait(); 
} 
catch (AggregateException ex) 
{ 
    if (!(ex.InnerException is TaskCanceledException)) 
     throw; 
} 
+0

うわー!華麗な説明をありがとう。あなたはただ一つのことを確認できますか? MaxDegreeOfParallelismをnumber> 1に設定すると、コレクションの型をスレッドセーフであるためにConcurrentStackのように変更する必要がありますか? –

+0

'storeBlock'のコレクションを意味していますか?そして、あなたは 'MaxDegreeOfParallelism'をどこで設定しますか? 'storeBlock'の' MDOP'を> 1に設定すると、そこにあるスレッドセーフなコレクションを使用する必要があります(またはロックを使用する必要があります)。しかし、他のブロックの 'MDOP'を> 1に設定すると、' storeBlock'の並列性に影響しないので、スレッドセーフを考慮する必要はありません。 – svick

+0

これは私を2012年にアップグレードさせる予定です! +1 – toddmo

関連する問題