2017-01-02 6 views

答えて

3

アプリケーションの存続期間中パイプラインを維持する必要があり、要求だけでなく静的クラスを使用して保持する必要がある場合は、アクションブロックでcompleteを呼び出す必要は必ずしもありません。必要に応じて、アプリケーションと処理パイプラインを分離することもできます。これらは、データベース・メッセージ・キューまたは別個のサーバー側アプリケーションによって分離できます。

@svickは、TaskCompletionSourceを使用してパイプラインが特定の項目で終了した時点を判断するのに適しています。ここで一緒にすべてを置くことは役に立つかもしれません迅速なサンプルです:パイプライン内の特定のジョブの完了を調整するいくつかの方法が

public class Controller { 

    public async Task<int> PostToPipeline(int inputValue) { 
     var message = new MessageIn(inputValue); 
     MyPipeline.InputBuffer.Post(message); 
     return await message.Completion.Task; 
    } 
} 

public class MessageIn { 
    public MessageIn(int value) { 
     InputValue = value; 
     Completion = new TaskCompletionSource<int>(); 
    } 

    public int InputValue { get; set; } 
    public TaskCompletionSource<int> Completion { get; set; } 
} 

public class MessageProcessed { 
    public int ProcessedValue { get; set; } 
    public TaskCompletionSource<int> Completion { get; set; } 
} 

public static class MyPipeline { 

    public static BufferBlock<MessageIn> InputBuffer { get; private set; } 
    private static TransformBlock<MessageIn, MessageProcessed> transform; 
    private static ActionBlock<MessageProcessed> action; 

    static MyPipeline() { 
     BuildPipeline(); 
     LinkPipeline(); 

    } 

    static void BuildPipeline() { 
     InputBuffer = new BufferBlock<MessageIn>(); 

     transform = new TransformBlock<MessageIn, MessageProcessed>((Func<MessageIn, MessageProcessed>)TransformMessage, new ExecutionDataflowBlockOptions() { 
      MaxDegreeOfParallelism = Environment.ProcessorCount, 
      BoundedCapacity = 10 
     }); 

     action = new ActionBlock<MessageProcessed>((Action<MessageProcessed>)CompletedProcessing, new ExecutionDataflowBlockOptions() { 
      MaxDegreeOfParallelism = Environment.ProcessorCount, 
      BoundedCapacity = 10 
     }); 
    } 

    static void LinkPipeline() { 
     InputBuffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true }); 
     transform.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true }); 
    } 

    static MessageProcessed TransformMessage(MessageIn message) { 
     return new MessageProcessed() { 
      ProcessedValue = message.InputValue++, 
      Completion = message.Completion 
     }; 
    } 

    static void CompletedProcessing(MessageProcessed message) { 
     message.Completion.SetResult(message.ProcessedValue); 
    } 
} 

あります。完了のソースを待つことは、あなたのニーズに最適なアプローチかもしれません。

+0

はい、SendAsync/Postをブロックに追加し、Completionを待たずに戻ることができますが、これは実際に「返されたオブジェクト」を知りたいときに設計上の問題を引き起こします。 理想的には、すべてのパイプを閉じずに非同期の完了をしたいのですが、再初期化せずに収入を永久に待たなければなりません。 – mike00

+0

素晴らしい!私の問題に対する美しい解決策。どうもありがとうございました! – mike00

4

Dataflowは、(単純なパイプライン以上のものをサポートしているため)特定の入力に対してパイプラインの出力を得るための優れたソリューションはありません。

これを回避するには、TaskCompletionSource<T>を作成し、それをパイプラインに入力する必要があります。パイプラインの各ブロックはそれを次のブロックに送り、最後のブロックはSetResult()を呼び出します。

パイプラインへの入力を送信するコードはawaitTaskCompletionSourceTaskパイプラインの出力を待つことができます。

+0

その解決策を指摘してくれてありがとう@svick! – mike00