2017-03-19 3 views
1

私は一連のイベントを持っています。タイプはAかBのいずれかです。このシーケンスをテキストファイルから読みます。ReactiveX `withLatestFrom`と分離して参加します

私はこのシーケンスで観察を作成する:

A1-B-B-A2-B-B-B

、それらは、そのフィルタ:

Astream = stream.filter(trueIfA)

Bstream = stream.filter(trueIfB )

A: A1-----A2------ 
B: --B-B-----B-B-B 

しかし、01を使用すると

A2B-A2B----A2B-A2B-A2B 

なぜこれが事実である:、私はで終わりますか? A1B-A1B-A2B-A2B-A2Bのシーケンスを得るために、どうすればそれを操作できますか?

答えて

1

Observable.combineLatest()を使用する必要があります。Observableのいずれかから最新の排出を取得したい場合は、いずれかのアイテムが放出されます。
withLatestFrom()オペレータがcombineLatestと同様であるが、単一のソースObservable(「B」)がオペレータに渡されObservable Sのいずれかがない場合、アイテムを発するときcombineLatestが行うようのみ、アイテムを発します。 source

1

超えyosriz」と答え、あなたはまた、元のソースをマルチキャストするpublish(Func1)が必要:

Observable<T> source = ... 

source.publish(o -> 
    Observable.combineLatest(o.filter(trueIfA), o.filter(trueIfB), 
     (a, b) -> concat)) 
.subscribe(...) 
+0

は、両方のソリューションを試してみましたが、何の影響も与えていないようです、今までのストリームがオンになっているようです右には常に最後の要素のみが含まれます。 – Xrave

+0

あなたのソースは正確ですか?彼らは適切に歩調を合わせていますか?もしそれらが同期していれば、実際にcombineLatestはあなたのために働かないでしょう。 – akarnokd

+0

私のソースは配列のように、すべての行をListに読み込み、そのリストからObservableを作成します。それは "同期"ですか?私は、Rx APiが静的データを使って「順序」と「時間」を強制することを期待していたと思いますが、誤解されている可能性があります。 – Xrave

関連する問題