2016-05-03 5 views
7

Observableの最新の値を取得し、それが呼び出された直後に を放出しようとしています。一例として、以下のコードを考える:Observableの最新値を取得して即座に出力します

return Observable.just(myObservable.last()) 
    .flatMap(myObservable1 -> { 
     return myObservable1; 
    }) 
    .map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object 

これはflatMapが順番にmapに到達するために発光するように を持っていますmyObservable1を放出する、これを行うことであるため動作しません。 このようなことが可能であるかどうかわかりません。誰もこの目標を達成するための手がかりを持っていますか?ありがとうございます

+0

あなたは何の話をしていますか?最新のことはどういう意味ですか? – akarnokd

+0

「myObservable」は、不規則な間隔で例えば「1」、「2」、「3」を放出する熱い観察可能なものである。私がしたいことは、 'return'ディレクティブに到達した時点で' myObservable'の最新値(私たちの場合は "3")を取得できることです。 –

+0

myObservableは暑いまたは寒いですか? –

答えて

17

last()ここではObservableが最後に放出されたアイテムを与えるのを待つので、ここでは何の助けにもなりません。

放射観測可能性を制御できないと仮定すると、単純にBehaviorSubjectを作成し、それをリッスンして作成したサブジェクトにサブスクライブするデータを放出するオブザーバブルに登録することができます。 SubjectObservableSubscriberの両方であるため、必要なものを手に入れることができます。

私は今すぐ確認する時間がないと思うので、購読者がすべて自動的に購読を停止しないと、BehaviorSubjectとして元の観察可能リストから手動で購読を解除する必要があるかもしれません。

このような何か:RxJavaで

BehaviorSubject subject = new BehaviorSubject(); 
hotObservable.subscribe(subject); 
subject.subscribe(thing -> { 
    // Here just after subscribing 
    // you will receive the last emitted item, if there was any. 
    // You can also always supply the first item to the behavior subject 
}); 

http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html

0

は、subscriber.onXXXが呼び出されasynchronous.Itは新しいスレッドでの観察可能なEMITアイテムならば、あなたは前に最後の項目を取得することはできないことを意味しあなたがスレッドをブロックしてアイテムを待つ以外は返す。しかしObservableが同期的にアイテムを放出し、subscribeOnとobservOnによってスレッドを変更しないと、

Observable.just(1,2,3).subscribe(); 
01のように この場合

は、あなたがこのようにすることによって最後の項目を取得することができます:

Integer getLast(Observable<Integer> o){ 
    final int[] ret = new int[1]; 
    Observable.last().subscribe(i -> ret[0] = i); 
    return ret[0]; 
} 

それはthis.RxJavaようにすることは悪い考えだ

は、あなたがそれによって非同期作業を行うことを好みます。

+1

はほとんどの場合動作しません – ndori

0

ここで実際に達成したいのは、非同期タスクをとり、同期タスクに変換することです。それを達成するためのいくつかの方法があります

、それとのそれぞれが長所と短所です:

  • 使用toBlocking()を - それは、ストリームが終了するまで、このスレッドは一つだけを取得するためには、遮断されることを意味しますアイテムは単にアイテムが配送されると完了するのでfirst()を使用してください。 その後すぐに最後の値を取得する方法は、次のようになりますのは、あなたの全体の流れがObservable<T> getData(); あるとしましょう:

public T getLastItem(){ return getData().toBlocking().first(); }

それが完了するのストリームを待ちますようlast()を使用しないでください最後の項目のみが出力されます。

あなたのストリームがネットワークリクエストでまだアイテムを取得していない場合、これはスレッドをブロックします!したがって、アイテムがすぐに利用可能であることが確かであるとき(またはブロックが本当に必要な場合) ...)

  • 別のオプションは、単に、このような何かを最後の結果をキャッシュすることです。

    のgetData()サブスクライブ(T-> cachedT = T;)//どこかのコードでと最後に配信されたアイテムを保存し続けます。 public T getLastItem(){ return cachedT; }

あなたはnullを取得するか、設定した初期値何でも、それを要求する時までに送られた任意の項目がなかった場合。 このapprochの問題は、getの後にsubscribeフェーズが起こり、2つの異なるスレッドで競合条件が発生する可能性があるということです。

関連する問題