2017-03-16 9 views
0

TPL DataFlowに商品を投稿すると、投稿が遅れる可能性のあるメカニズムはありますか?アイテムが処理される前に遅れて送信されたデータフロー

public partial class BasicDataFlowService 
{ 
    private readonly ActionBlock<string> workerBlock; 

    public BasicDataFlowService() 
    { 
     workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions() 
     { 
      MaxDegreeOfParallelism = 32 
     }); 
    } 

    partial void DoWork(string fileName); 

    private void AddToDataFlow(string file) 
    { 
     workerBlock.Post(file); 
    } 
} 

AddToDataFlowの中で、私は遅延を指定できるようにしたいと思います(例えば、我々は30秒間処理を延期したいことを決定した場合)。

TransFormBlocknew System.Threading.ManualResetEvent(false).WaitOne(1000);と使用することを検討しました。

var requeueBlock = new TransformBlock<string, string>(file => 
{ 
    new System.Threading.ManualResetEvent(false).WaitOne(1000); 
    return file; 
}); 

requeueBlock.LinkTo(workerBlock); 

しかし、チェーン内の他のブロックで使用できる不必要なスレッドを消費しているように見えます。

答えて

0

まず、ManualResetEventをシングルトンとして保存する必要があります。そうしないと、すべてのスレッドが待機する独自のオブジェクトを取得し、あなたのアプローチはうまくいかないでしょう。

パイプラインでAppDomainの内部で同期を行う必要がある場合は、第2に、ManualResetEventの代わりにManualResetEventSlimバージョンを検討してください。

マシンのコアを無駄に待たずに再利用したい場合は、SpinWait軽量構造を調べる必要があります。あなたは、この場合のJoseph Albahari' articleが役立つことがあります。Sleep(0)Sleep(1)またはYieldメソッド呼び出しで、それはあなたのケースのために非常に効果的ですので:

// singleton variable 
bool _proceed; 

var requeueBlock = new TransformBlock<string, string>(file => 
{ 
    var spinWait = new SpinWait(); 
    while (!_proceed) 
    { 
     // ensure we have the latest _proceed value 
     Thread.MemoryBarrier(); 
     // try to spin for a while 
     // after some spins, yield to another thread 
     spinWait.SpinOnce(); 
    } 
    return file; 
}); 

SpinWait内部yeildする方法を決定します。

0

workerBlockに値を転記する前に遅延を追加するには、遅延を挿入して値を転記する前に待ってください。 workerBlockの容量が制限されている場合は、await SendAsyncです。目標を達成するためのいくつかのオプション:

private async Task AddToDataflow(string file, TimeSpan delay) { 
    await Task.Delay(delay); 
    await workerBlock.SendAsync(file); 
} 

private async Task AddToDataflow(string file) { 
    var delay = TimeSpan.FromSeconds(30); 
    await Task.Delay(delay); 
    await workerBlock.SendAsync(file); 
} 

private async void AddToDataflow(string file) { 
    var delay = TimeSpan.FromSeconds(30); 
    await Task.Delay(delay); 
    workerBlock.Post(file); 
} 
関連する問題