2017-06-30 11 views
2

私はマルチスレッド化に新しいので、値を受け取る方法で列挙子を作成しようとしていますが、バックグラウンドで処理を続けます。スレッドからの値の処理yield returnの実行

は、私はこのように、最終的なクラスを使用することができるようにしたい:

while (await asyncEnumerator.MoveNextAsync()) 
{ 
     T result = asyncEnum.Current; 
     //Do Something 
} 

しかし、私はそれが私はwhileループで何かをやっている間に次の結果を得ていきたいと思います。

コメント:各結果は前の結果に依存するので、それを並列に処理することができません。

私は多分キューの結果を持っていると思って、キュー内の最初の結果を返している間に結果を得るためにスレッドを処理していましたが、これを動作させることはできません。これは私の試みですが、私は何をすべきか分からないMoveNext():私はコード私が知りたいのですが、これを行うには良い方法があるかどうかは明らか

public class AsyncEnumerator<T> 
{ 
    private object _sync = new object(); 

    private readonly Queue<T> _queue; 
    private readonly IEnumerable<T> _enumerable; 

    public AsyncEnumerator(IEnumerable<T> enumerable) 
    { 
     _enumerable = enumerable; 
     _queue = new Queue<T>(); 
    } 

    /// <summary> 
    /// Start getting results when requested the firstTime 
    /// </summary> 
    private bool _startFlag; 

    public async Task<bool> MoveNextAsync() 
    { 
     if (_finished) 
     { 
      return false; 
     } 
     if (!_startFlag) 
     { 
      _startFlag = true; 
      SetResultsAsync(); 
     } 
     if (_queue.Count > 0) 
     { 
      Current = _queue.Dequeue(); 
      return true; 
     } 
     //What here? 
     return await Task.Run(()=> true); 
    } 

    private T _current; 
    public T Current 
    { 
     get 
     { 
      lock (_sync) 
      { 
       return _current; 
      } 
     } 
     private set { _current = value; } 
    } 

    private bool _finished; 

    private async Task SetResultsAsync() 
    { 
     IEnumerator<T> enumerator = _enumerable.GetEnumerator(); 
     bool moveNext = await Task.Run(() => enumerator.MoveNext()); 
     while (moveNext) 
     { 
      _queue.Enqueue(enumerator.Current); 
      moveNext = await Task.Run(() => enumerator.MoveNext()); 
     } 
     _finished = true; 
    } 
} 

を更新しました

、Iマルチスレッド化に新しいです。

+0

あなたはおそらく問題を引き起こすasync voidメソッドを使用すべきではありません。 MoveNextAsync asyncを作成し、SetResultsAsyncがタスクを返すようにします。ところで、 'IAsyncEnumerable 'と 'IAsyncEnumerator 'があります。 –

+0

@CamiloTerevintoまだ 'MoveNextAsync()'で私を助けません... コード内のものを修正しました – tjw

+0

SetResultsAsyncはMoveNextAsyncで待っています。それ以外に、キューが空の場合、それはすべてのアイテムがすでに処理されたということではありませんか?その場合はfalseを返します。 –

答えて

4

しかし、私はwhileループで何かをしている間、次の結果を得たいと思っています。

ここでは、単純な非同期列挙子が間違った解決策です。列挙子は「プル」技術です。消費コードが次の項目を要求したときにのみ処理を行います。

「プロデューサ」と「コンシューマ」の問題について考えてみましょう。消費者がそれを要求したときにだけプロデューサが動作する場合は、列挙子のシナリオがあります。あなたのケースでは、プロデューサが独立してアイテムを作成したいので、列挙子は正しくフィットしません。

1つのオプションはreactive extensionsです。観測値は、観測値が「プッシュ」技術であることを除いて、列挙子に類似した一連の値を表します。したがって、プロデューサーは消費者に価値を押しつけ、新しい価値に反応します(したがってReactiveという名前)。

Rxはあなたにはうってつけのようですが、かなり高い学習曲線があります。

より単純なオプションは、より伝統的なプロデューサ/コンシューマキューを使用することです。非同期に消費するので、非同期互換のプロデューサ/コンシューマキューが必要になります。 BufferBlock<T>one solutionであり、AsyncProducerConsumerQueue<T>my AsyncEx libraryである。どちらの場合も

、あなたはプロデューサーをキックオフしてから消費したい:

var queue = new BufferBlock<T>(); 

// Kick off producer 
var producer = Task.Run(() => 
{ 
    while (...) 
    { 
    T value = ...; 
    queue.Post(value); 
    } 
    queue.Complete(); 
}); 

// Consumer code 
while (await queue.OutputAvailableAsync()) 
{ 
    var value = await queue.ReceiveAsync(); 
    ... 
} 

あなたが正しくプロデューサーコードから例外を検出できるように最終的に、あなたは、awaitproducerそのタスクになるでしょう。

+0

なぜあなたは2回「待っていますか?一度は十分ではありませんか? – tjw

+0

技術的には、 'OutputAvailableAsync'が' true'を返すと、次の 'ReceiveAsync'が同期して終了するので、代わりに' Receive'を使うことができます。一貫性のために 'ReceiveAsync'を使用します。 –

関連する問題