私はアプリケーションにTPL DataFlowを適用する方法に苦労しています。TPL DataFlowのパイプライン混乱 - データコールごとに新しいパイプラインを作成する必要がありますか?どのように流れるデータを追跡できますか?
以前はI was just using Tasksというトラッキングと管理を行う並列データ処理がありますが、DataFlowを実装してより詳細な制御を実現しようとしています。私はそれを、データを取得すると言うし、処理するために、タスクのパイプラインを構成しています
は、ここでそれは完全なget
データ、process
データへのパイプラインの例だと、log
:私は
TransformBlock<string, string> loadDataFromFile = new TransformBlock<string, string>(filename =>
{
// read the data file (takes a long time!)
Console.WriteLine("Loading from " + filename);
Thread.Sleep(2000);
// return our result, for now just use the filename
return filename + "_data";
});
TransformBlock<string, string> prodcessData = new TransformBlock<string, string>(data =>
{
// process the data
Console.WriteLine("Processiong data " + data);
Thread.Sleep(2000);
// return our result, for now just use the data string
return data + "_processed";
});
TransformBlock<string, string> logProcessComplete= new TransformBlock<string, string>(data =>
{
// Doesn't do anything to the data, just performs an 'action' (but still passses the data long, unlike ActionBlock)
Console.WriteLine("Result " + data + " complete");
return data;
});
// create a pipeline
loadDataFromFile.LinkTo(prodcessData);
prodcessData.LinkTo(logProcessComplete);
私はthis tutorialをフォローしようとしています。
私の混乱は、チュートリアルではこのパイプラインが「一度」の操作であるようです。それはパイプラインを作成し、それを一度オフにして完了します。
TPLデータフローを使用する通常の方法は、すべてのブロックを作成することで、リンク それらを一緒にして、一方の端にデータを入れて起動します。これは、データフローのライブラリは、設計された私が読んだと思われる方法に反するようです。
Stephen Clearyの "Concurrency in C#Cookbook"から。
しかし、私はtrack
のデータを「片端に」書き込んだ後にどうしたらよいか分かりません。私は、プログラムの複数の部分からprocessed
データを得ることができる必要があります。ユーザーが「File1」からデータを取得し、何かを行うために1つ、「File2」からデータを取得するために、同期パイプラインを流れる情報の一つだけがありますように、それがうまく動作します 『:
Console.WriteLine("waiting for File 1");
await loadFile1ButtonPress();
Console.WriteLine("waiting for File 2");
await loadFile2ButtonPress();
Console.WriteLine("Done");
は予想を生成:「これらが実行されている場合
public async Task loadFile1ButtonPress()
{
loadDataFromFile.Post("File1");
var data = await logProcessComplete.ReceiveAsync();
Console.WriteLine($"Got data1: {data}");
}
public async Task loadFile2ButtonPress()
{
loadDataFromFile.Post("File2");
var data = await logProcessComplete.ReceiveAsync();
Console.WriteLine($"Got data2: {data}");
}
dが、私が思う、このようなものが必要』出力:
waiting for File 1
Loading from File1
Processiong data File1_data
Result File1_data_processed complete
Got data1: File1_data_processed
waiting for File 2
Loading from File2
Processiong data File2_data
Result File2_data_processed complete
Got data2: File2_data_processed
Done
これは私には理にかなって、それだけを1つずつやっている:
しかし、ポイントは、私は非同期並列としてこれらの操作を実行したいです。私はこれをシミュレートした場合(たとえば、立て続けの両方の「ボタンの押下)を持つ:第2の動作は、最初よりも長くかかる場合
Console.WriteLine("waiting");
await Task.WhenAll(loadFile1ButtonPress(), loadFile2ButtonPress());
Console.WriteLine("Done");
は、この作業を行いましたか?
私は両方が(もともとこれは 仕事しませんでしたが、それは私が修正したバグだった - それは正しいアイテムを返すん)しかし、最初のデータを返すように期待していました。
私は、データとアクションを実行するActionBlock<string>
をリンクのようなものでした考えていた
:
public async Task loadFile1ButtonPress()
{
loadDataFromFile.Post("File1");
// instead of var data = await logProcessComplete.ReceiveAsync();
logProcessComplete.LinkTo(new ActionBlock<string>(data =>
{
Console.WriteLine($"Got data1: {data}");
}));
}
しかし、これは完全にパイプラインを変更していることがあることを利用していて、今loadFile2ButtonPress
がまったく動作しませんパイプライン。
同じブロックで複数のパイプラインを作成できますか?または、私は、データフローライブラリを使用するポイントを打ち負かしていると思われる、それぞれの操作のための全く新しいパイプライン(および新しいブロック)を作成する必要がありますか?
これがStackoverflowなどに最適な場所であるかどうかわかりません。 Codereview?ちょっと主観的かもしれない。あなたがlink your blocksをすることができ、コメントで言われよう
var subscription = logProcessComplete.AsObservable();
subscription.Subscribe(i => Console.WriteLine(i));
を:あなたは、いくつかのデータが処理された後に発生するいくつかのイベントが必要な場合
パイプラインは火災ではありません。それらは、未処理のエラーがスローされたとき、または最初のブロックで 'Complete()'を呼び出して完了したときにのみ終了し、完了を伝播します。毎回新しいパイプラインを作成する必要はありません。それを一度作成し、必要なだけ多くのファイルを投稿してください。アプリケーションを終了して残っているメッセージを処理したいときには 'Complete()'を呼び出してください。 –
これは私が思ったことです - 特定の入力に対して特定の出力*を実際に取得するにはどうすればいいですか? – Joe
BTWあなたは補完を伝播しません。つまり、 'loadDataFromFile'で' Complete() 'を呼び出すと補完は' logProcessComplete'に伝播しません。 'LinkTo'呼び出しに' PropagateCompletion'をtrueに設定した 'DataflowLinkOptions'オブジェクトを渡します –