2016-11-23 6 views
-2

テキストファイルに格納されている大量のデータを処理したい。c#PLinq AsParallel選択がハングアップする

var result = File 
    .ReadLines(textBox1.Text) 
    .AsParallel() 
    .WithDegreeOfParallelism(100) 
    .Select(line => ProcessLine(line)); 

それを処理し、ArrayListに追加して、行を取得しますProcessLine方法:ここで私はそれがより速く動作させるために使用するコードです。

すべての処理が完了した後、ArrayListをDatagrid にロードしますが、時にはすべての行が完了し、時にはハングアップすることがあります。理由はわかりません。

提案がありますか?ここ更新

を方法ProcessLine

private string ProcessLine(string domain) 
     { 



      ProcessStartInfo cmdinfo = new ProcessStartInfo(); 
      cmdinfo.FileName = "cmd.exe"; 
      cmdinfo.Arguments = "/c nslookup"; 
      cmdinfo.RedirectStandardInput = true; 
      cmdinfo.RedirectStandardOutput = true; 
      cmdinfo.CreateNoWindow = true; 
      cmdinfo.UseShellExecute = false; 
      cmdinfo.RedirectStandardError = false; 
      Process cmdd = new Process(); 
      cmdd = Process.Start(cmdinfo); 
      string spf = "none"; 
     createproc: 
      try 
      { 



       cmdd.StandardInput.WriteLine("set q=txt"); 
       cmdd.StandardInput.Flush(); 
       cmdd.StandardInput.WriteLine(domain); 

       cmdd.StandardInput.WriteLine("exit"); 
       cmdd.StandardInput.WriteLine("exit"); 
       StreamReader r = cmdd.StandardOutput; 

       //cmdd.WaitForExit(); 
       cmdd.Close(); 
       spf = ""; 
       string rdl = string.Empty; 
       bool spffound = false; 
       while (rdl != null) 
       { 
        try 
        { 

         rdl = r.ReadLine(); 

         if (rdl.Contains("v=spf")) 
         { 
          spffound = true; 
          spf = rdl.Trim(); 
          this.Invoke(new MethodInvoker(delegate 
          { 
           textBox2.AppendText("domain found : " + domain + Environment.NewLine + "SPF = " + spf + Environment.NewLine); 
           textBox2.Update(); 

          })); 
          break; 
         } 

        } 
        catch (Exception) 
        { 

        } 
       } 
       if (!spffound) 
        spf = "none"; 

       nbrDoms++; 
       this.Invoke(new MethodInvoker(delegate 
       { 
        DomsElapsed.Text = nbrDoms + " Domains Elapsed"; 
        DomsElapsed.Update(); 

       })); 
       SPFRecord srx = new SPFRecord((string)spf.Clone(), (string)domain.Clone()); 

       if (srx == null) 
       { 
        cmdd.Kill(); 
        cmdinfo = new ProcessStartInfo(); 
        cmdinfo.FileName = "cmd.exe"; 
        cmdinfo.Arguments = "/c nslookup"; 
        cmdinfo.RedirectStandardInput = true; 
        cmdinfo.RedirectStandardOutput = true; 
        cmdinfo.CreateNoWindow = true; 
        cmdinfo.UseShellExecute = false; 
        cmdinfo.RedirectStandardError = false; 

        cmdd = new Process(); 
        cmdd.StartInfo = cmdinfo; 
        cmdd.Start(); 

        goto createproc; 
       } 

       lock (pageManager) 
       { 
        pageManager.AddRecord(srx); 
       } 
       //this.Invoke(new MethodInvoker(delegate 
       //{ 
       //})); 

      } 
      catch(Exception exc) 
      { 
       cmd.Kill(); 

       cmdinfo = new ProcessStartInfo(); 
       cmdinfo.FileName = "cmd.exe"; 
       cmdinfo.Arguments = "/c nslookup"; 
       cmdinfo.RedirectStandardInput = true; 
       cmdinfo.RedirectStandardOutput = true; 
       cmdinfo.CreateNoWindow = true; 
       cmdinfo.UseShellExecute = false; 
       cmdinfo.RedirectStandardError = false; 

       cmdd = new Process(); 
       cmdd.StartInfo = cmdinfo; 
       cmdd.Start(); 
       Thread.Sleep(10); 
       goto createproc; 
      } 
      return ""; 
     } 
+0

あなたは 'ProcessLine'メソッドにコードを投稿できますか?私は競争状態をそこに疑う。 – RePierre

+0

@RePierre更新を見てください – Th3Wolf

+0

WithDegreeOfParallelism(100)を使用すると、デフォルトで –

答えて

0

file.readalllinesようなもの(psudoコード)

基本的に、各スレッドは、他のロックされていると、文字列にテキストの行を読み取られますあなたはこれを高速化しようとしているのですか、ファイルが大きすぎてメモリに収まらないからですか?

+0

どうやってスレッドがお互いをロックしているのか、元の質問では何も利用できないと思いましたか? –

+0

私は、 +15000000 records 各レコードを処理してページマネージャに追加します。ページがない場合はページマネージャが空のページ(ArrayList)を作成し、レコードを広告します。ページ数== 27ページマネージャは別のページを作成してからレコードを追加します処理終了後iユーザーが次のページをクリックして、選択したページから読み込んでデータグリッドに表示すると、各ページがデータグリッドに読み込まれます。 – Th3Wolf

+0

Mrinal Kamboj - コードを読んだ後にロックを引き受けました。そして、私がメモリに読み込むことについて尋ねた理由は、テキストファイルからすべてのスレッドを読み込もうとしているという事実です。これは時間の無駄です。私は1つの文字列、それからすべての文字列を読むだろう。 – Trey

-1

あなたのコードに正しくない2つのものがあります:あなたは、レコードの追加のために、それはスレッドセーフにするために、ロックが、データと並列処理量されているが

  1. lock (pageManager) 
    { 
        pageManager.AddRecord(srx); 
    } 
    

は、あなたのケースでは非常に高いです、WithDegreeOfParallelism(100)、それはどこか悪影響を及ぼしており、システムをほぼ膝に持っています。私はあなたが100以上の論理的/物理的コアにサーバーを使用していないことを確信しています。 実際、デフォルトでPLinqはシステムリソースを利用した並列処理を可能にします。 WithDegreeOfParallelism(100)ステートメントを完全に削除することができます。ロックについて

  • 理想がはるかに効率的な運用のためにあるConcurrentBag<T>またはConcurrentQueue<T>のような安全なスレッド、同時実行準拠したコレクションを、使用することであろうと、あなたはまた、ToList()を使用してList<T>ポスト操作に変換することができます。
  • もう一つの課題は、あなたがpageManagerをロックして、理想的にこのロックは、共通の共有プライベートクラスobjectにしなければならない、同じを変更するのではなく、完全なコレクションをロックしている場合、他のオプションがいるICollectionのSyncRootにアクセスし、それをロックすることです次のようにもProcessLineは、新しいプロセスを起動さ

    lock((pageManager as ICollection).SyncRoot) 
    

、あなたがプロセスとして100のこのようなプロセスは、出てくることを期待しない、それは非常に非効率的だし、理想的に今は、Async-Awaitのユースケースを持っていますリモートプロセスを呼び出すために、th実際にはスレッドをまったく使用しないことになります。

+0

ねえ、プロセスラインを変更してthis.Invoke(.....)行をすべて削除しましたが、それらを追加すると問題がそこにあります – Th3Wolf

0

[OK]を、言及するいくつかのこと:

  1. gotoステートメントを使用しないでください - それはあなたの方法が何をするかを理解するのは難しいです。 Processの作成を別の方法に移して、代わりにそのメソッドを呼び出してください。goto
  2. プロセスあなたは何をしたいのですか?をロードするのに時間がかかります。このような負荷のペナルティを避けるために、プロセスを作成して呼び出す代わりに、そのプロセスを同じ方法で置き換えてみてください。プロセスを呼び出すことなくnslookupexampleがあります。あなたのニーズに合わせて調整してみてください
  3. lockを削除してください。 - アプリケーションがどうにかして100スレッドを使用する場合、lockは時間の無駄です。 99個のスレッドを待っており、スレッドにデータをプッシュします。 @ Mrinal Kambojが指摘するように、スレッドセーフなコレクションを使うことができます。この場合、BlockingCollection<T>を使用して結果をそこに追加します。キューのもう一方の端には、到着時に各項目をリスニングして消費するpageManagerがあります。
  4. UIは、にも時間がかかります。をリフレッシュするために、別々のサイクルが必要です。 pageManager.AddRecord()何らかの形でUIをリフレッシュする必要がある場合、他のスレッドは追加操作だけを待機しません。
  5. UIの更新は、コントロールを作成したスレッドで行う必要があり、そのスレッドを待っている場合、そのスレッドはUIを更新できません。

全体的なアルゴリズムは次のようになります。

public class Engine 
{ 
    private readonly BlockingCollection<string> _messagePipeline = new BlockingCollection<string>(); 

    public BlockingCollection<string> MessagePipeline 
    { 
     get { return _messagePipeline; } 
    } 

    public void Process(string file) 
    { 
     File.ReadLines(file) 
      .AsParallel() 
      .ForAll(line => 
      { 
       var nsLookupResult = NsLookupMethod(line); 
       if(nsLookupResult.HasInfoYouNeed) 
        _messagePipeline.Add(nsLookupResult.DisplayInfo); 
      }); 
    } 
} 

public class MainForm : Form 
{ 
    private readonly Engine _engine; // ... 

    private void OnStartButtonClick(object sender, EventArgs e) 
    { 
     var cts = new CancellationTokenSource(); 
     _engine.Process(textbox1.Text); 
     Task.Factory.StartNew(()=> 
     { 
      foreach(var message in _engine.MessagePipeline.GetConsumingEnumerable()) 
      { 
       // show the message 
       Application.DoEvents(); // allow the app to process other events not just pushing messages. 
      } 
     }, cts.Token, 
     TaskCreationOptions.PreferFairness, 
     // Specify that you want UI updates to be done on the UI thread 
     // and not on any other thread 
     TaskScheduler.FromCurrentSynchronizationContext()); 
    } 
} 

そして、それはそれを行う必要があります。私は実際にこの種のロジックの(多かれ少なかれ学問的な)exampleを持っています。 UI更新ロジックはアプリケーションのMainFormにあり、処理ロジックはEngine classです。そこに見てください。

関連する問題