2017-06-30 4 views
2

EDIT:いくつかのコード行を編集しました。IDEで実行すると、エラーや何もせずに失敗します。リアクティブエクステンション - 同期式でSubject/IObservableをフラッシュする

Reactive Extensionsを初めて使用していて、私が整理しようとしている問題があります。私はRXを使ってマシン上のイベントをキューに入れています。そして、そのたびにそのデータをサーバに送信します。私の問題は、アプリケーションがシャットダウンしているときに、あらゆる種類の非同期呼び出しであるものは、キャンセルして実行していないように見えるので、イベントの最後のバッチは送信されません。

件名があります。ここで、イベントは自分のデータクラスです。私はSubjectが使える最高のクラスではないかもしれないことを知っていますが、ここにはあります。

私のコードは、主に次のようになります、わかりやすくするためにいくつかのコメントを追加しました:

IObservable<IList<Event>> eventsObserver = Instance.EventBuffer.ToList<Event>(); 
var eventsEnumerable = eventsObserver.ToEnumerable(); 
List<Event> events = new List<Event>(); 
try 
{ 
    events = (List<Event>)eventsEnumerable.First(); // THIS LINE FAILS SILENTLY, EVEN IN DEBUGGER... 
} 

catch(Exception ex) 
{ 
    System.Diagnostics.Debug.WriteLine("Error: " + ex.Message); 
} 

using (var client = new HttpClient()) 
{ 
    client.BaseAddress = new Uri(someURI); 
    HttpResponseMessage response = client.PostAsync(somePage, new StringContent(SerializeToJSON(events))).Result; 
    response.EnsureSuccessStatusCode(); 
} 

私は(「.Result」が)Webサーバの同期への呼び出しを行わない場合は、それが失敗しましたそこ。私は、IObservableから送信できるものにデータを取得するためにさまざまな方法を試しましたが、コードは失敗します(通常はある種の悪いキャストで失敗します)。または、イベントがデータ構造にまだ入れられていません。送信したい。私はRXが本質的に非同期であることを知っています、そして私はそれを同期的な方法で対処することを求めています、私は解決策があると思います。前もって感謝します!

+0

'eventsEnumerable.First()'の呼び出しは、観測可能な 'Instance.EventBuffer'が' OnCompleted'で終わる場合にのみ完了します。それは起こるのだろうか?私たちに見せるために[mcve]を提供できますか? – Enigmativity

答えて

0

Observableソースを制御すると、Enigmativityが指摘したようにObservable.OnComplete()を呼び出すことができます。そうでなければ、あなたはそれをバッファリングする前に受け取ったすべての値のコピーを保持しようとすることができます:

Observable.Do(x => localCopy = x).Buffer(..) 

このローカルコピーはシャットダウン時にあなたがアクセス可能になります。

いずれの場合でも、最新のRxバージョンでは.First()が廃止されており、発生する可能性のある問題を避けることができます。

関連する問題