2011-11-27 6 views
6

stream.DataAvailableがfalseになるまで、以下のobservable repeatを繰り返すにはどうすればよいですか? 現在、それは決して止まらないようです。Fast Repeat TakeWhileにより無限ループが発生する

Deferセクション内のAsyncReadChunkとObservable.Returnは、OnNext呼び出しとOnCompleted呼び出しを行います。 リピートがOnNextコールを受信すると、それをTakeWhileに渡します。 TakeWhileが満足されていないときは観測可能な状態になりますが、OnNextが非常に速いOnCompletedは、Repeatを観察可能に再登録して無限ループを引き起こすようになると思います。

この現象を修正するにはどうすればよいですか?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
{ 
    return Observable.Defer(() => 
     { 
      try 
      { 
       return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]); 
      } 
      catch (Exception) 
      { 
       return Observable.Return(new byte[0]); 
      } 
     }) 
     .Repeat() 
     .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
} 
+4

あなたの問題を解決する方法を理解していただき、あなたのソリューションを共有していただきありがとうございます。しかし、あなたの質問を編集するのではなく、答えとしてあなたの質問に答えを投稿してください。 –

+0

サメット、私は自問自答を質問から外し、コミュニティwikiと記された別の回答に移しました。 –

答えて

2

SELFのANSWER:(以下サメット、質問の著者によって投稿答えはしかし、彼は質問の一部としての答えを掲載、私は別々にそれを移動しています。答えは、作者がそれ自身を移動していないことから、コミュニティのwikiとしてマークする。)


私はそれはスケジューラに問題があることをリファクタリングによって発見されました。 ReturnはCurrentThreadを使用している間に即時スケジューラを使用します。固定コードは以下の通りです。

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
    { 
     return Observable.Defer(() => 
            { 
             try 
             { 
              return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
             catch (Exception) 
             { 
              return Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
            }) 
      .Repeat() 
      .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
    } 
関連する問題