2016-04-01 21 views
0

EventLoopScheduler(少なくとも1つの作業キューに項目がある)でDisposeを呼び出すと、ObjectDisposedExceptionがスローされます。例外はワーカースレッドからスローされます。EventLoopScheduler:Disposeで予期しない動作が発生するObjectDisposedException

私が見て、すでに存在して二つの質問を読んだ:しかし、私はを引用、答えのいくつかは、非常に正確ではないと思います

をEventLoopSchedulerに関するRxの紹介

EventLoopSchedulerはIDisposableを実装しており、Disposeを呼び出すとスレッドが終了することができます。 IDisposableの実装と同様に、作成するリソースのライフタイムを明示的に管理することが適切です。

出典:http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#EventLoopScheduler

彼らが正しくEventLoopSchedulerを使用する方法についての例を提供します。

Observable 
    .Using(()=>new EventLoopScheduler(), els=> GetPrices(els)) 
    .Subscribe(...) 

残念ながら、この例では:-)少なくともではない私にとっては(動作しません。コードのこの部分を考える:

internal class Program 
{ 
    private static void Main(string[] args) 
    { 
     var source = new Subject<string>(); 
     var subscription = Observable.Using(
      () => new EventLoopScheduler(), 
      scheduler => source 
       .ObserveOn(scheduler) 
       .Do(LongRunningAction)) 
      .Subscribe(); 

     source.OnNext("First action (2 seconds)"); 
     Thread.Sleep(TimeSpan.FromSeconds(1)); 
     subscription.Dispose(); // Scheduler is still busy! 
     Console.ReadLine(); 
    } 

    private static void LongRunningAction(string text) { 
     Thread.Sleep(TimeSpan.FromSeconds(2)); 
     Console.WriteLine(text); 
    } 
} 

私は(サブスクリプションが1秒後に配置されているにもかかわらず)エラーなし2秒後にテキストメッセージを参照してください期待されます。 EventLoopSchedulerは進行中の操作をキャンセルすることはできません。それは私にとっては大丈夫です。

あなたが実際に手に入れたのは、メッセージで、未処理のObjectDisposedExceptionです。

これはバグですか、それとも間違っていますか? :-)

この例外を回避するには、現在EventLoopSchedulerをラップし、scheduler.Schedule(() => scheduler.Dispose())wrapper.Dispose()に呼び出してください。

+0

私はあなたがリンクしたという私の答えを見直しました。本当に追加すべきものが何であるかわからない - 例外の根拠についてはかなり徹底的に説明していると思う。 –

+0

こんにちはジェームズ、あなたのコメントありがとう! 1.あなたは書いています_私はそれが 'IDisposable'を実装しているので真実ではないと思っています。これらのスケジューラーを処分しないと、多くのオープンスレッドが発生します。スケジューラーは** **ガベージコレクションではありません。そのため、私はObservable.Using'を使って提案されたパターンを使用したのです。2. LongRunningActionは単なる例でした。マルチスレッド環境では、どのくらい時間がかかるかは分かりません。登録解除がいつ行われるかは誰にも知られていません。 –

+0

OK、あなたはあなたの邪悪な半引用符で私を広げてしまった。 :)私の答えを参照してください。 –

答えて

0

上記の私のコメントはJamesの答えです。この「回答」は、問題を「修正」するサンプルコードを提供するためのものです。

しかし、私はEventLoopSchedulerにバグがあると思います。私は、それが処分されていれば再帰的に作業をスケジュールし続けるべきだとは思わない。

void Main() 
{ 
    //In my example GetPrices is the source. 
    // I meant that you could use an ELS to do some heavy work to get prices. 
    //var source = new Subject<string>(); 
    var subscription = Observable.Using(
     () => new EventLoopScheduler(), 
     scheduler => 
     { 
      return Observable.Create<string>((obs, ct) => 
      { 
       var scheduleItem = scheduler.Schedule(0, (state,self) => { 
        //Do work to get price (network request? or Heavy CPU work?) 
        var price = state.ToString("c"); 
        LongRunningAction(price); 
        obs.OnNext(price); 
        //Without this check, we see that the Scheduler will try to 
        // recursively call itself even when disposed. 
        if(!ct.IsCancellationRequested) 
         self(state+1); 
       }); 
       return Task.FromResult(scheduleItem); 
      }); 
     }) 
     .Subscribe(); 

    Thread.Sleep(TimeSpan.FromSeconds(1)); 
    subscription.Dispose(); // Scheduler is still busy! 
    Console.ReadLine(); 
} 

private static void LongRunningAction(string text) 
{ 
    Thread.Sleep(TimeSpan.FromSeconds(2)); 
    Console.WriteLine(text); 
} 

*ただし、私がそうでないと確信したとき、私は完全に私の心を変える権利を留保します。

FWIW:一般的に私はELSをサービス内のreadonlyフィールドとして使用しているに過ぎません。これは、スレッドにいくつかのインバウンド作業の処理専用にしたいものです。例えば私は、そのサービスのためにネットワークまたはディスクから読み取るために1つのスレッドを使用したいだけです。この場合、私はELSを作成し、それはどんな仕事もします。それを含むクラスが配置されるときに配置されます。 IntroToRx.comのサンプルが示すように、私は頻繁にそれを使用するとは思わない。

+0

あなたの答えをありがとう!私はそれをテストするために私の例を書き換えます。 –

0

したがって、文脈から引用されているので、私は応答することを余儀なくされています。 :)引用符を重要なビットに広げてみましょう:

あなたはそのEventLoopSchedulerを処分することはできません! を他のRxオペレーターに渡すと、あなたはその責任を に渡しました。

問題は、あなたがオブザーバー(加入者)を持つスケジューラをクリーンアップしようとしていること、です。しかし、オブザーバーはにスケジューラーを渡しましたです。スケジューラーを破棄したい場合は、スケジューラーを現在所有している観測対象であると考える必要があります。観測可能では知っている:

  • それは
  • それは、その情報に基づいて加入者

だonCompleteの/ ONERRORすべてに送信されたとき、それは完全に置かれていますから解除されると、それが

  • に加入したときそれが与えられたスケジューラがいつ廃棄されるかを知ることができる。 (それでも、一般的なクリーンアップをしようとしている場合は、ファイナライザでスケジューラを処分する必要があります。これは、別の加入者が特別な知識なしに行かないことを保証できる唯一の点です)

    ただし、個々の加入者はこの情報を保持する保証はありません。潜在的な他の加入者の知識、および最後のイベントが送信されたときに公開されません。観測可能なスケジューラは、あらゆる種類のファンキーな方法でパーティーに参加できます。多くの睡眠をとるクレイジーメソッドを呼び出す;薄い空気から出来事を作り出すのは、それが気になるからです。次の火曜日までイベントを遅らせます。冷蔵庫にメモをつけることで契約解除イベントに応答し、そのことを約束するmañana、正直。

    だから、そのスケジューラを毎回安全にクリーンアップしたいのですか?それからあなたは観察可能にする必要があります。

    組み込みの演算子ではこれを気にする必要はありません。ほとんどの使用例では必要ないため、大きな懸念事項ではないと思われます。実際には、私は今までプログラムの寿命のために使用されてきたEventLoopSchedulerを処分する必要があるケースを見たことはないと思います。あなたが見ているすべてのIDisposableを処分する必要があると思ってしまうのは簡単ですが、実際にはRxでは必要ではありません(特にサブスクリプションでは、サブスクリプションをキャンセルするという要求があります。 。。資源IDisposableは完璧なサブスクリプション・ハンドルを作ったときのRxチームは別のインターフェイスを作成したくなかった)

    EventLoopSchedulerは、それが忙しくない時に、スレッドの一時停止 - ので、ほとんどの時間は、あなただけの心配する必要はありませんあなたがそれらのいくつかの任意の数を作成していない限り、クリーンアップについて(ヒント:あなたは本当にこれを行う必要はありません)。

    あなたは、あなたが実際にスケジューラキューが空の場合は、スレッドを終了する特殊な秘密(すなわち内部)モードでは、カバーの下にEventLoopSchedulerを使用する、NewThreadSchedulerが代わりに行います場合は見たいと思うかもしれませんならば - それ以外の場合はを再利用します。はい、逆の一般的な誤解にもかかわらず、NewThreadSchedulerスレッドを再利用します。したがって、単一のサブスクライバからの負荷の下で大きなスレッド作成コストがかかりません。複数のスレッドが表示されたり、アイドル状態になって次のイベントが発生すると、スレッド作成が行われます。

    しかし、あなたはEventLoopSchedulerを使用している場合、あなたはおそらく(結局 - それは、イベントが通常行うループものだ - アプリで単一のスレッドにイベントを一元管理)1つのグローバルに共有イベントループに物事を結びつけるために一つの場所でそれを使用しています - そのスレッドをクリーンアップすることはめったに必要ありません。プロセスがとにかく終了したときに実行されるからです。

  • +0

    こんにちはジェームズ、私は "新聞"を演奏してあなたの見積もりの​​一部を残して申し訳ありません。 –

    +0

    「EventLoopScheduler」の正確な実装を誰かが知っていれば、あなたの答えは妥当と聞こえます。実際にはもっと多くの質問が開かれます。なぜ公式文書は*間違った例*を提案するのですか? - あるいは、「Observable.Using」が存在するのはなぜですか? 'EventLoopSchedulers'を作成して処理するための有効なユースケースがあると仮定します(私はダイナミックに割り当てられたアクターのために受信ボックスとして使用します) - エラーを見つけたり、ソースコードを読むことなく、スケジューラをファイナライザに配置しますか?私が考える時間が長くなればなるほど、*意図されていない/バグ*のように見えます。 –

    +0

    私は通常、あなたは加入者をきつくコントロールするだろうと思っています。処分がOKであることを知るのに十分です。確かにアクター型のモデルでは、イベントのシリアル化を保証するイベントループは必要ありません。なぜなら、Rxはあなたにそれを与えるからです。 –

    0

    わかりました。しかし、それはスレッドセーフではない、関連コード行はコメントでマークされています。私はバグチケットを開くべきだと思います: -/

    private static void Main(string[] args) 
    { 
        var originSource = new Subject<string>(); 
        var subscription = UsingEventLoop(originSource) 
         .Do(LongRunningAction) // runs on EventLoopScheduler thread 
         .Subscribe(); 
    
        originSource.OnNext("First action (appears after 2 seconds)"); 
        originSource.OnNext("Second action (must not appear"); 
    
        Thread.Sleep(TimeSpan.FromSeconds(1)); 
        subscription.Dispose(); // Scheduler is still busy with first action! 
    
        Console.WriteLine("Press any key to exit."); 
        Console.ReadLine(); 
    } 
    
    private static IObservable<TValue> UsingEventLoop<TValue>(IObservable<TValue> source) 
    { 
        return Observable.Using(
         () => new EventLoopScheduler(), 
         scheduler => Observable.Create<TValue>((obs, ct) => 
         { 
          return Task.FromResult(source.Subscribe(value => 
          { 
           // The following check+call is NOT thread safe! 
           if (!ct.IsCancellationRequested) 
           { 
            scheduler.Schedule(() => obs.OnNext(value)); 
           } 
          })); 
         })); 
    } 
    
    private static void LongRunningAction<TValue>(TValue value) { 
        Thread.Sleep(TimeSpan.FromSeconds(2)); 
        Console.WriteLine(value); 
    } 
    
    関連する問題