2017-04-11 15 views
2

複数のURLを同時にクロールしようとしています。各リクエストは、クロールのためにさらに多くのURLをConcurrentBagに追加することがあります。現時点では、新しいURLを処理するために新しいParallel.ForEachを開始する厄介なwhile(true)があります。Parallel.ForEachに使用されているConcurrentBagにアイ​​テムを追加するC#

ConcurrentBagの内容に追加できる方法はありますか?Parallel.ForEachには新しいアイテムがあり、これらの新しいアイテムを繰り返していますか?

ConcurrentBag<LinkObject> URLSToCheck = new ConcurrentBag<LinkObject>(); 

while (true) 
{ 
    Parallel.ForEach(URLSToCheck, new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL => 
    { 
     Checker Checker = new Checker(); 

     URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL); 

     List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html); 

     foreach (var URLToAdd in URLsToAdd) 
     { 
      URLSToCheck.Add(new LinkObject { sourceURL = URLToAdd.sourceURL, destinationURL = URLToAdd.destinationURL }); 
     } 
    }); 

    if(URLSToCheck.Count == 0)break; 
} 
+0

再帰的コードに潜んでいると便利です。これが適用される典型的な例です。 Btw、循環参照に注意してください。 – Stefan

+0

ありがとう、私はそれをチェックします! :-) – jamie

答えて

2

ここでDataFlowは便利です。 ActionBlockで、それはうまく行うことができます。その後、

// Capture the variable, so it can be used in the next block 
ActionBlock<LinkObject> = actionBlock = null; 

actionBlock = new ActionBlock<LinkObject>(URL => 
{ 
    Checker Checker = new Checker(); 
    URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL); 
    List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html); 
    URLsToAdd.ForEach(actionBlock.Post) 
},new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5}); 

そしてactionBlockあなたの最初のURLを追加します。

actionBlock.Post(url1); 
actionBlock.Post(url2); 
... 
+0

ありがとう、これは本当に助けてくれました:-)他の誰かがこのインストールを使用しているなら、NuGetを通してMicrosoft.Tpl.Dataflow – jamie

3

あなたはBlockingCollectionを見てみることができます。

BlockingCollectionは、プロデューサ/コンシューマパターンの実装を提供します。プロデューサはブロックコレクションに追加され、Parallel.ForEachはコレクションから消費されます。あなたはBlockingCollection用のカスタムパーティショナーを実装する必要がありますそうするには

(理由はここで説明されていますhttps://blogs.msdn.microsoft.com/pfxteam/2010/04/06/parallelextensionsextras-tour-4-blockingcollectionextensions/

パーティション分割:

class BlockingCollectionPartitioner<T> : Partitioner<T> 
{ 
    private BlockingCollection<T> _collection; 

    internal BlockingCollectionPartitioner(BlockingCollection<T> collection) 
    { 
     if (collection == null) 
      throw new ArgumentNullException("collection"); 
     _collection = collection; 
    } 

    public override bool SupportsDynamicPartitions 
    { 
     get { return true; } 
    } 

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount) 
    { 
     if (partitionCount < 1) 
      throw new ArgumentOutOfRangeException("partitionCount"); 

     var dynamicPartitioner = GetDynamicPartitions(); 
     return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray(); 
    } 

    public override IEnumerable<T> GetDynamicPartitions() 
    { 
     return _collection.GetConsumingEnumerable(); 
    } 
} 

次に、あなたは次のようにそれを使用します

BlockingCollection<LinkObject> URLSToCheck = new BlockingCollection<LinkObject>(); 

Parallel.ForEach(
    new BlockingCollectionPartitioner<LinkObject>(URLSToCheck), 
    new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL => 
     { 
      //.... 
     }); 

あなたはURLSToCheckコレクションに追加します:

URLSToCheck.Add(...) 

処理するURLを入力すると、URLSToCheck.CompleteAdding() となり、Parallel.ForEachは自動的に停止するはずです。

関連する問題