2012-11-12 3 views
6

この例では、https://stackoverflow.com/a/9980346/93647とここにはWhy is my disruptor example so slow?(質問の最後にあります)のパブリッシャーとアイテムが1つあります。1人のパブリッシャーと4人の並列コンシューマーによる混乱の例

私の場合、消費者の仕事はずっと複雑で時間がかかります。ですから、データを並行して処理する4人の消費者が必要です。

ですから、例えばプロデューサープロデュース番号の場合:1,2,3,4,5,6,7,8,9,10,11 ..私はconsumer1は1,5,9-をキャッチしたい

、 ...消費者2は2,6,10を捕まえる...消費者3は3,7,11を捕まえる...消費者4は4,8,12を捕まえる...(正確にはこれらの数字ではなく、アイデアはそのデータである並列に処理する必要があります。特定の数値がどのコンシューマで処理されるかは気にしません)

実際のアプリケーションのコンシューマ向け作業ではかなりコストがかかるため、これを並列に行う必要があります。私は消費者がマルチスレッドシステムのパワーを使うために異なるスレッドで実行されることを期待しています。

もちろん、私は4つのリングバッファを作成し、1つのコンシューマを1つのリングバッファに接続することができます。このように私は元の例を使用することができます。しかし、私はそれが正しいとは思わない。おそらく、1つのパブリッシャー(1つのリングバッファー)と4つのコンシューマーを作成するのは正しいでしょう - これが私の必要なものです。 Googleグループでは非常にまね質問へのリンクを追加

  • 1人のリング、多くの消費者(各消費者が「ウェイクアップ」するすべてのほかに、すべて:https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

    だから我々は2つのオプションがあります消費者は同じWaitStrategyを持っている必要があります)。

  • 多くの "1つのリング - 1つの消費者"(各消費者は、処理すべきデータにのみ目を覚まし、各消費者は独自のWaitStrategyを持つことができます)。

答えて

1

EDIT:コードが部分的にthe FAQから取られていることを忘れていました。このアプローチがフランクの提案よりも良いか悪いのか分かりません。

プロジェクトは厳しく文書化されていますが、それはいいと思うほど残念です。
はとにかく(あなたの最初のリンクに基づいて)次のスニップを試す - モノでテストしてOKのようです:

using System; 
using System.Threading.Tasks; 
using Disruptor; 
using Disruptor.Dsl; 

namespace DisruptorTest 
{ 
    public sealed class ValueEntry 
    { 
     public long Value { get; set; } 
    } 

    public class MyHandler : IEventHandler<ValueEntry> 
    { 
     private static int _consumers = 0; 
     private readonly int _ordinal; 

     public MyHandler() 
     { 
      this._ordinal = _consumers++; 
     } 

     public void OnNext(ValueEntry data, long sequence, bool endOfBatch) 
     { 
      if ((sequence % _consumers) == _ordinal) 
       Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal); 
      else 
       Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);      
     } 
    } 

    class Program 
    { 
     private static readonly Random _random = new Random(); 
     private const int SIZE = 16; // Must be multiple of 2 
     private const int WORKERS = 4; 

     static void Main() 
     { 
      var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default); 
      for (int i=0; i < WORKERS; i++) 
       disruptor.HandleEventsWith(new MyHandler()); 
      var ringBuffer = disruptor.Start(); 

      while (true) 
      { 
       long sequenceNo = ringBuffer.Next(); 
       ringBuffer[sequenceNo].Value = _random.Next();; 
       ringBuffer.Publish(sequenceNo); 
       Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value); 
       Console.ReadKey(); 
      } 
     } 
    } 
} 
+0

ありがとう!しようとします! – javapowered

+0

ringbuffer 'Next()' 'Publish'メソッドの呼び出しがスレッドセーフであるかどうか知っていますか?並行して呼び出すことはできますか?私は2つの異なるスレッドringbuffer 'Next'メソッドから呼び出すことができますか? – javapowered

+0

また、内部的にどのように混乱を招くのでしょうか?いくつのスレッドが作成されますか? Disruptorは消費者ごとに個別のスレッドを作成しますか?または何らかのスレッドプールが使用されていますか? – javapowered

0

リングバッファの仕様から、すべてのコンシューマがお客様のValueEventを処理しようとします。あなたの場合、あなたはそれを必要としません。

は、私はこのようにそれを解決:

があなたのValueEventと消費者が、彼はその場でテストイベントを取るとき、それはすでに処理されている場合、彼は次のフィールドに移りに加工フィールドを追加します。

最も美しい方法ではありませんが、バッファの仕組みです。

+0

あなたはそのフィールドに同期が必要ですか?それを 'volatile'宣言するか' Interlock'クラスを使って 'bool'フィールドを更新しますか?リングバッファに複数のコンシューマを接続する方法も? HandleEventsWithメソッドに消費者を1つだけ渡すことができます。これまでのところ、4つのリングバッファを作成し、毎回公開するために次のリングバッファを使用してそれらを循環させるほうが簡単だと思います:) – javapowered

+0

4つのリングバッファを作成すると、リングの「ロードバランシング」機能が失われますバッファ、それはあなたがそれを使用している理由だと思います。あなたの他のQでは、私はJAVAバッファを使用しているので、コードを表示することはできませんが、例に従うだけで、かなり明確です。 – Frank

+0

この特定のケースでは、私のタスクはほぼ「等しく」、4つの間でそれらを分割するだけでOKですので、「負荷分散」は必要ありません。しかし、それは興味深い特徴です。 Javaの例に従うべきですか?私はほとんどすべてのC#の例を見つけることができないので。 – javapowered

関連する問題