2016-11-27 8 views
3

ループするTPLデータフロー内の完了を検出する方法を決定する際に問題があります。TPLデータフローループの完了

GETリクエストをリモートサーバーに送信し、データレスポンスを処理しているデータフローの一部にフィードバックループがあります(データフローをさらに変換して結果をコミットする)。

データソースは、その結果を1000レコードのページに分割し、使用可能なページの数を教えません。私はちょうど私がデータの完全なページより少なくなるまで読むことを保たなければならない。

通常、ページ数は1です。頻繁に10までです。毎回1000sがあります。

私は最初から多くのフェッチを要求しています。
これに対処するためにスレッドプールを使用できるようにするには、すべて問題ありません。複数のデータ要求をキューに入れて同時に要求できます。多数のページを取得する必要があるインスタンスを見つけた場合、私はこのためにすべてのスレッドを使用したいと考えています。私は他の人たちが終わっている間に一本の糸が残っているのを残したくない。

//generate initial requests for activity 
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp)); 

//fetch the initial requests and feedback more requests to our input buffer if we need to 
TransformBlock<DataRequest, DataResponse> fetch = null; 
fetch = new TransformBlock<DataRequest, DataResponse>(async req => 
{ 
    var resp = await Fetch(req); 

    if (resp.Results.Count == 1000) 
     await fetch.SendAsync(QueueAnotherRequest(req)); 

    return resp; 
} 
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); 

//commit each type of request 
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp)); 

request.LinkTo(fetch); 
fetch.LinkTo(commit); 

//when are we complete? 

QueueRequestsIEnumerable<DataRequest>を生成します。私のような、データフローには、このロジックをドロップすると

私が持っている問題があります。次のNページのリクエストを一度にキューに入れ、これは私が必要とするよりもわずかに多くのコールを送信することを意味します。 DataRequestのインスタンスはLastPageカウンタを共有して、最後のページの後にわたっていることを無意識のうちに要求しないようにします。すべてこれは問題ありません。

問題:
この例で示したように、フェッチの入力バッファにさらに多くの要求をフィードバックしてループすると、補完を通知する(または検出する)方法に問題があります。私はリクエストからのフェッチで完了を設定することはできません。完了がセットされると、私はそれ以上のフィードバックはできません。

フェッチ時に入力バッファと出力バッファが空であることを監視できますが、完了時に要求があってもフェッチがまだビジー状態にあると考えられます。

フェッチがビジーであることがわかっています(入力があったか入力を処理中です)。

これを解決するための明白な方法がないのですか?

  • さらにリクエストをキューに入れるのではなく、フェッチ内でループする可能性があります。その問題は、設定した最大数のスレッドを使用して、リモートサーバーに何をしているのかを調整できるようにすることです。ブロック内の並列ループがブロック自体とスケジューラを共有し、その結果のスレッド数をスケジューラを介して制御できますか?

  • 完了シグナリングを処理するためにfetch用のカスタム変換ブロックを作成することができました。このような単純なシナリオのための多くの作業のようです。

ご協力いただきありがとうございました!

+0

今、すべての要求が最初のブロックで生成された瞬間にですか? – VMAtm

+0

ええ、パイプラインを開始するには、私は 'foreach(todolistのvar c){request.Post(c); }; '。それから私は 'request.Complete();'を呼び出すことができます。これ以上のリクエストは追加しません。 – ajk

+0

@ajkもしあなたがしているのであれば、すべてのブロックリンクで 'a.LinkTo(b、new DataflowLinkOptions {PropagateCompletion = true})'を使うのはなぜですか? 'request.Complete()'を呼び出すと、すべての項目がパイプラインのすべての段階を通過した後、 'commit.Completion'が完了状態に移行します。 –

答えて

0

は、今の私は、フェッチブロックに簡単なビジー状態カウンタを追加しました: - 次のように私は、完全な通知するために使用

int fetch_busy = 0; 

TransformBlock<DataRequest, DataResponse> fetch_activity=null; 
fetch = new TransformBlock<DataRequest, ActivityResponse>(async req => 
    { 
     try 
     { 
      Interlocked.Increment(ref fetch_busy); 
      var resp = await Fetch(req); 

      if (resp.Results.Count == 1000) 
      { 
       await fetch.SendAsync(QueueAnotherRequest(req)); 
      } 

      Interlocked.Decrement(ref fetch_busy); 
      return resp; 
     } 
     catch (Exception ex) 
     { 
      Interlocked.Decrement(ref fetch_busy); 
      throw ex; 
     } 
    } 
    , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); 

: - doesntの

request.Completion.ContinueWith(async _ => 
    { 
     while (fetch.InputCount > 0 || fetch_busy > 0) 
     { 
      await Task.Delay(100); 
     } 

     fetch.Complete(); 
    }); 

は非常にエレガントに見えますしかし、私はうまくいくはずです。 propagation of completion of the blockを指定してDataflowLinkOptions

TPLデータフローで
+2

私の理解では、 'SendAsync'は、次のブロックが新しい項目を受け入れてくれることを確認するとすぐに戻ります。したがって、 '' fetch.SendAsync'が '' fetch_busy''をインクリメントする機会を得る前に終了する( 'fetch_busy'を減らす)ことが可能です。その間に、 'fetch'ブロックがあなたの継続によって完全であるとマークされることがあります(' fetch_busy'と 'fetch.InputCount'が両方ともゼロになる場合)。飛行中の 'Fetch'タスクが1000個のアイテムを生成し、別の' SendAsync'を試みると、それは静かに失敗します。 –

+0

これは明らかにかなり遠いですが、想像もつかないシナリオなので、 'await fetch.SendAsync'が' false'を返す場合は、投げてください。また、引数として非同期ラムダを指定した 'ContinueWith'は' Task 'を返します(結果で何かをすることに決心した場合、これは驚きにつながります)。 –

+0

@KirillShlenskiyありがとう、はい、私は調査します。 'ContinueWith'の中で' fetch.InputCount> 0'のチェックでそれを軽減しようとしました。 'await fetch.SendAsync'は、新しくキューに入れられたリクエストが' fetch.InputCount'に表示される前に返すことができますか? – ajk

2

、あなたがすることができlink the blocks:その後

request.LinkTo(fetch, new DataflowLinkOptions { PropagateCompletion = true }); 
fetch.LinkTo(commit, new DataflowLinkOptions { PropagateCompletion = true }); 

は、あなたは単にrequestブロックのためComplete()メソッドを呼び出して、すれば完了です!

// the completion will be propagated to all the blocks 
request.Complete(); 

あなたが使用する必要があり、最終的な事は、最後のブロックのCompletionタスクプロパティです:

commit.Completion.ContinueWith(t => 
    { 
     /* check the status of the task and correctness of the requests handling */ 
    }); 
+0

こんにちは@VMAtm、上記のコメントで説明したように、これは理解されています。ただし、Fetchに完了が伝播されると、Fetchは入力バッファにさらにメッセージを送信できなくなります。フェッチは、応答を受け取ったときに、より多くのデータが利用可能であることを検出すると、メッセージを自分自身に戻します。フェッチ時に完了が設定されると、このフィードバック方法はもはや許可されません。 – ajk

+0

さて、 'fetch'から' commit'まで補完を伝播し、 'fetch'状態をチェックするために' request.Completion.ContinueWith'ループを使用してください。 – VMAtm

+0

多くの感謝の意を表します。フェッチが完了したことを知る良い方法があるかどうかは不明でしたが、そうでなければこれで生きることができます! – ajk