ループするTPLデータフロー内の完了を検出する方法を決定する際に問題があります。TPLデータフローループの完了
GET
リクエストをリモートサーバーに送信し、データレスポンスを処理しているデータフローの一部にフィードバックループがあります(データフローをさらに変換して結果をコミットする)。
データソースは、その結果を1000レコードのページに分割し、使用可能なページの数を教えません。私はちょうど私がデータの完全なページより少なくなるまで読むことを保たなければならない。
通常、ページ数は1です。頻繁に10までです。毎回1000sがあります。
私は最初から多くのフェッチを要求しています。
これに対処するためにスレッドプールを使用できるようにするには、すべて問題ありません。複数のデータ要求をキューに入れて同時に要求できます。多数のページを取得する必要があるインスタンスを見つけた場合、私はこのためにすべてのスレッドを使用したいと考えています。私は他の人たちが終わっている間に一本の糸が残っているのを残したくない。
//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));
//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
var resp = await Fetch(req);
if (resp.Results.Count == 1000)
await fetch.SendAsync(QueueAnotherRequest(req));
return resp;
}
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));
request.LinkTo(fetch);
fetch.LinkTo(commit);
//when are we complete?
QueueRequests
がIEnumerable<DataRequest>
を生成します。私のような、データフローには、このロジックをドロップすると
私が持っている問題があります。次のNページのリクエストを一度にキューに入れ、これは私が必要とするよりもわずかに多くのコールを送信することを意味します。 DataRequestのインスタンスはLastPageカウンタを共有して、最後のページの後にわたっていることを無意識のうちに要求しないようにします。すべてこれは問題ありません。
問題:
この例で示したように、フェッチの入力バッファにさらに多くの要求をフィードバックしてループすると、補完を通知する(または検出する)方法に問題があります。私はリクエストからのフェッチで完了を設定することはできません。完了がセットされると、私はそれ以上のフィードバックはできません。
フェッチ時に入力バッファと出力バッファが空であることを監視できますが、完了時に要求があってもフェッチがまだビジー状態にあると考えられます。
フェッチがビジーであることがわかっています(入力があったか入力を処理中です)。
これを解決するための明白な方法がないのですか?
さらにリクエストをキューに入れるのではなく、フェッチ内でループする可能性があります。その問題は、設定した最大数のスレッドを使用して、リモートサーバーに何をしているのかを調整できるようにすることです。ブロック内の並列ループがブロック自体とスケジューラを共有し、その結果のスレッド数をスケジューラを介して制御できますか?
完了シグナリングを処理するためにfetch用のカスタム変換ブロックを作成することができました。このような単純なシナリオのための多くの作業のようです。
ご協力いただきありがとうございました!
今、すべての要求が最初のブロックで生成された瞬間にですか? – VMAtm
ええ、パイプラインを開始するには、私は 'foreach(todolistのvar c){request.Post(c); }; '。それから私は 'request.Complete();'を呼び出すことができます。これ以上のリクエストは追加しません。 – ajk
@ajkもしあなたがしているのであれば、すべてのブロックリンクで 'a.LinkTo(b、new DataflowLinkOptions {PropagateCompletion = true})'を使うのはなぜですか? 'request.Complete()'を呼び出すと、すべての項目がパイプラインのすべての段階を通過した後、 'commit.Completion'が完了状態に移行します。 –