内で実行されている、私は三つのブロックを持っています。アクションブロックでCompletion()を呼び出しずにパイプ全体を停止することなく永遠に実行されるようなパイプを作成する方法。TPLデータフローパイプが常にサービス例えば
答えて
アプリケーションの存続期間中パイプラインを維持する必要があり、要求だけでなく静的クラスを使用して保持する必要がある場合は、アクションブロックでcompleteを呼び出す必要は必ずしもありません。必要に応じて、アプリケーションと処理パイプラインを分離することもできます。これらは、データベース・メッセージ・キューまたは別個のサーバー側アプリケーションによって分離できます。
@svickは、TaskCompletionSourceを使用してパイプラインが特定の項目で終了した時点を判断するのに適しています。ここで一緒にすべてを置くことは役に立つかもしれません迅速なサンプルです:パイプライン内の特定のジョブの完了を調整するいくつかの方法が
public class Controller {
public async Task<int> PostToPipeline(int inputValue) {
var message = new MessageIn(inputValue);
MyPipeline.InputBuffer.Post(message);
return await message.Completion.Task;
}
}
public class MessageIn {
public MessageIn(int value) {
InputValue = value;
Completion = new TaskCompletionSource<int>();
}
public int InputValue { get; set; }
public TaskCompletionSource<int> Completion { get; set; }
}
public class MessageProcessed {
public int ProcessedValue { get; set; }
public TaskCompletionSource<int> Completion { get; set; }
}
public static class MyPipeline {
public static BufferBlock<MessageIn> InputBuffer { get; private set; }
private static TransformBlock<MessageIn, MessageProcessed> transform;
private static ActionBlock<MessageProcessed> action;
static MyPipeline() {
BuildPipeline();
LinkPipeline();
}
static void BuildPipeline() {
InputBuffer = new BufferBlock<MessageIn>();
transform = new TransformBlock<MessageIn, MessageProcessed>((Func<MessageIn, MessageProcessed>)TransformMessage, new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 10
});
action = new ActionBlock<MessageProcessed>((Action<MessageProcessed>)CompletedProcessing, new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 10
});
}
static void LinkPipeline() {
InputBuffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });
transform.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true });
}
static MessageProcessed TransformMessage(MessageIn message) {
return new MessageProcessed() {
ProcessedValue = message.InputValue++,
Completion = message.Completion
};
}
static void CompletedProcessing(MessageProcessed message) {
message.Completion.SetResult(message.ProcessedValue);
}
}
あります。完了のソースを待つことは、あなたのニーズに最適なアプローチかもしれません。
Dataflowは、(単純なパイプライン以上のものをサポートしているため)特定の入力に対してパイプラインの出力を得るための優れたソリューションはありません。
これを回避するには、TaskCompletionSource<T>
を作成し、それをパイプラインに入力する必要があります。パイプラインの各ブロックはそれを次のブロックに送り、最後のブロックはSetResult()
を呼び出します。
パイプラインへの入力を送信するコードはawait
TaskCompletionSource
のTask
パイプラインの出力を待つことができます。
その解決策を指摘してくれてありがとう@svick! – mike00
- 1. TPLデータフロー - 非常に速いプロデューサー、非常に高速なコンシューマーOutOfMemory例外
- 2. は、例えば、通常の整数
- 3. TPLと例外処理
- 4. なぜマングースは常に、例えば私のコレクション名
- 5. オブジェクトが例えば、配列
- 6. Tplで例外を処理する
- 7. 例えばLLVM
- 8. ?例えば
- 9. 例えば
- 10. 例えばジャバスクリプト
- 11. 例えば、
- 12. 例えば
- 13. 例えば、データベースクエリ
- 14. 例えば
- 15. 例えばtensorflow
- 16. 例えば、コミットメント
- 17. 例えば
- 18. 例えば
- 19. 例えば、
- 20. 例えばParse.com
- 21. は、例えば
- 22. 例えばセレンウェブドライバ
- 23. 例えばテキストファイル
- 24. は、例えば
- 25. 例えば
- 26. 例えばスイフト
- 27. マッピングコレクション/例えば
- 28. 例えばパイソン
- 29. 例えばループ
- 30. 例えば
はい、SendAsync/Postをブロックに追加し、Completionを待たずに戻ることができますが、これは実際に「返されたオブジェクト」を知りたいときに設計上の問題を引き起こします。 理想的には、すべてのパイプを閉じずに非同期の完了をしたいのですが、再初期化せずに収入を永久に待たなければなりません。 – mike00
素晴らしい!私の問題に対する美しい解決策。どうもありがとうございました! – mike00