2017-07-04 5 views
1

私はアプリケーションにTPL DataFlowを適用する方法に苦労しています。TPL DataFlowのパイプライン混乱 - データコールごとに新しいパイプラインを作成する必要がありますか?どのように流れるデータを追跡できますか?

以前はI was just using Tasksというトラッキングと管理を行う並列データ処理がありますが、DataFlowを実装してより詳細な制御を実現しようとしています。私はそれを、データを取得すると言うし、処理するために、タスクのパイプラインを構成しています

は、ここでそれは完全なgetデータ、processデータへのパイプラインの例だと、log:私は

TransformBlock<string, string> loadDataFromFile = new TransformBlock<string, string>(filename => 
{ 
    // read the data file (takes a long time!) 
    Console.WriteLine("Loading from " + filename); 
    Thread.Sleep(2000); 

    // return our result, for now just use the filename 
    return filename + "_data"; 
}); 

TransformBlock<string, string> prodcessData = new TransformBlock<string, string>(data => 
{ 
    // process the data 
    Console.WriteLine("Processiong data " + data); 
    Thread.Sleep(2000); 

    // return our result, for now just use the data string 
    return data + "_processed"; 
}); 

TransformBlock<string, string> logProcessComplete= new TransformBlock<string, string>(data => 
{ 
    // Doesn't do anything to the data, just performs an 'action' (but still passses the data long, unlike ActionBlock) 
    Console.WriteLine("Result " + data + " complete"); 
    return data; 
}); 

// create a pipeline 
loadDataFromFile.LinkTo(prodcessData); 
prodcessData.LinkTo(logProcessComplete); 

私はthis tutorialをフォローしようとしています。

私の混乱は、チュートリアルではこのパイプラインが「一度」の操作であるようです。それはパイプラインを作成し、それを一度オフにして完了します。

TPLデータフローを使用する通常の方法は、すべてのブロックを作成することで、リンク それらを一緒にして、一方の端にデータを入れて起動します。これは、データフローのライブラリは、設計された私が読んだと思われる方法に反するようです。

Stephen Clearyの "Concurrency in C#Cookbook"から。

しかし、私はtrackのデータを「片端に」書き込んだ後にどうしたらよいか分かりません。私は、プログラムの複数の部分からprocessedデータを得ることができる必要があります。ユーザーが「File1」からデータを取得し、何かを行うために1つ、「File2」からデータを取得するために、同期パイプラインを流れる情報の一つだけがありますように、それがうまく動作します 『:

Console.WriteLine("waiting for File 1"); 
await loadFile1ButtonPress(); 
Console.WriteLine("waiting for File 2"); 
await loadFile2ButtonPress(); 
Console.WriteLine("Done"); 

は予想を生成:「これらが実行されている場合

public async Task loadFile1ButtonPress() 
{ 
    loadDataFromFile.Post("File1"); 
    var data = await logProcessComplete.ReceiveAsync(); 
    Console.WriteLine($"Got data1: {data}"); 
} 

public async Task loadFile2ButtonPress() 
{ 
    loadDataFromFile.Post("File2"); 
    var data = await logProcessComplete.ReceiveAsync(); 
    Console.WriteLine($"Got data2: {data}"); 
} 

dが、私が思う、このようなものが必要』出力:

waiting for File 1 
Loading from File1 
Processiong data File1_data 
Result File1_data_processed complete 
Got data1: File1_data_processed 
waiting for File 2 
Loading from File2 
Processiong data File2_data 
Result File2_data_processed complete 
Got data2: File2_data_processed 
Done 

これは私には理にかなって、それだけを1つずつやっている:

enter image description here

しかし、ポイントは、私は非同期並列としてこれらの操作を実行したいです。私はこれをシミュレートした場合(たとえば、立て続けの両方の「ボタンの押下)を持つ:第2の動作は、最初よりも長くかかる場合

Console.WriteLine("waiting"); 
await Task.WhenAll(loadFile1ButtonPress(), loadFile2ButtonPress()); 
Console.WriteLine("Done"); 

は、この作業を行いましたか?

enter image description here

私は両方が(もともとこれは 仕事しませんでしたが、それは私が修正したバグだった - それは正しいアイテムを返すん)しかし、最初のデータを返すように期待していました。

私は、データとアクションを実行する ActionBlock<string>をリンクのようなものでした考えていた

public async Task loadFile1ButtonPress() 
{ 
    loadDataFromFile.Post("File1"); 
    // instead of var data = await logProcessComplete.ReceiveAsync(); 

    logProcessComplete.LinkTo(new ActionBlock<string>(data => 
    { 
     Console.WriteLine($"Got data1: {data}"); 
    })); 
} 

しかし、これは完全にパイプラインを変更していることがあることを利用していて、今loadFile2ButtonPressがまったく動作しませんパイプライン。

同じブロックで複数のパイプラインを作成できますか?または、私は、データフローライブラリを使用するポイントを打ち負かしていると思われる、それぞれの操作のための全く新しいパイプライン(および新しいブロック)を作成する必要がありますか?

これがStackoverflowなどに最適な場所であるかどうかわかりません。 Codereview?ちょっと主観的かもしれない。あなたがlink your blocksをすることができ、コメントで言われよう

var subscription = logProcessComplete.AsObservable(); 
subscription.Subscribe(i => Console.WriteLine(i)); 

を:あなたは、いくつかのデータが処理された後に発生するいくつかのイベントが必要な場合

+0

パイプラインは火災ではありません。それらは、未処理のエラーがスローされたとき、または最初のブロックで 'Complete()'を呼び出して完了したときにのみ終了し、完了を伝播します。毎回新しいパイプラインを作成する必要はありません。それを一度作成し、必要なだけ多くのファイルを投稿してください。アプリケーションを終了して残っているメッセージを処理したいときには 'Complete()'を呼び出してください。 –

+0

これは私が思ったことです - 特定の入力に対して特定の出力*を実際に取得するにはどうすればいいですか? – Joe

+0

BTWあなたは補完を伝播しません。つまり、 'loadDataFromFile'で' Complete() 'を呼び出すと補完は' logProcessComplete'に伝播しません。 'LinkTo'呼び出しに' PropagateCompletion'をtrueに設定した 'DataflowLinkOptions'オブジェクトを渡します –

答えて

2

は、あなたの最後のブロックAsObservable作るべき、とRx.Netでいくつかの小さなコードを追加します。 2つ以上のブロックに、with a predicate。その場合、メッセージはの最初のブロックにのみ配信されます。 BroadcastBlockを作成して、メッセージのコピーを各リンクされたブロックに配信することもできます。

他のすべてのブロックメッセージで望ましくないものがNullTargetにリンクされていることを確認してください。そうでない場合は、パイプラインに永遠に残り、完了を停止します。

複数のリンクの場合と同様に、パイプラインが完了を正しく処理していることを確認して、完了も最初にリンクされたブロックに伝播されます。

+0

ありがとう。これは質問に答えますが、私はまだDataFlowを使ってタスクを管理するのに丸みのある穴に四角いペグを合わせる感覚があります。興味深いことに、Rxを使ってみました。DataFlowはこれを思い出してくれます。*ストリーム*タイプのデータや、データのソース/パブリッシャーを持っていて、それに反応したい場所に最適です。それは私が元の問題を解決する手助けをしているような気がしません。 – Joe

+0

'Rx.Net'は、パイプラインを通って渡されたいくつかのデータについてあなたに通知するために簡単に使うことができます。これは' fork'操作のようなものです。 'TPL Dataflow'自体は、内部の特定のメッセージのステータスを提供するようには設計されていませんでした。 – VMAtm

+0

「TPLデータフロー自体は、特定のメッセージのステータスを提供するようには設計されていませんでした」 - LINQPadにDataflowブロックをダンプしたことがありますか?彼らは非常にうまく視覚化される。しかし、それが行われ、それは公衆のメンバーが利用できない内部の "魔法"でなければなりません。 – springy76

関連する問題