2017-06-13 5 views
1

AとBの2つのストリームがあるとします。要素を再利用できる2つの反応性ストリームを 'Zip'します。

ストリーム1:A1、A2、A3、A4、A5、A6 ストリームB:B1、B2、B3、B4

タイムライン:B1> A1> A2> B2> A3> A4> B3> A5 所望の出力:[B1 & A1]、[A2 & B2]、[B2 & A3]、[A4 & B3]、[B3 & A5]、[A6 & B4]

これはここで現実のプロセスを反映Bは両面で積み重ねられています。例えば、

---- B1 ----] 
      |-1 
---- A1 ----] 

---- A2 ----] 
      ]-2 
---- B2 ----|] 
      ]|-3 
---- A3 ---- ] 

---- A4 ----] 
      |-4 
---- B3 ----]] 
      |-5 
---- A5 ---- ] 

---- A6 ----] 
      |-6 
---- B4 ----] 

グルーピング1は最初のペアの特徴です。グループ化6は、端末ペアリングの特徴である。グループ2/3および4/5を繰り返すことができます。

私はRxJava2でこれを達成するためのアイデアを探しています。

ありがとう、Dan。

import java.util.Objects; 

import org.junit.Test; 

import io.reactivex.Flowable; 
import io.reactivex.functions.BiFunction; 
import io.reactivex.processors.FlowableProcessor; 
import io.reactivex.processors.ReplayProcessor; 

public class BuildControllerTest { 
    @Test 
    public void testProductBlanketSequencing() throws Exception {   
     final String a1="A1", a2="A2", a3="A3", a4="A4", a5="A5", a6="A6"; 
     final String b1="B1", b2="B2", b3="B3", b4="B4"; 

     class AB { 
      String a; 
      String b; 

      public AB(String a, String b) { 
       this.a = a; 
       this.b = b; 
      } 

      @Override public boolean equals(Object obj) { 
       if(getClass() != obj.getClass()) return false; 
       if(! Objects.equals(a, ((AB)obj).a)) return false; 
       if(! Objects.equals(b, ((AB)obj).b)) return false; 
       return true; 
      } 
     } 

     final FlowableProcessor<String> a = ReplayProcessor.create(); 
     final FlowableProcessor<String> b = ReplayProcessor.create(); 

     b.onNext(b1); 
     a.onNext(a1); 

     a.onNext(a2); 
     b.onNext(b2); 
     a.onNext(a3); 

     a.onNext(a4); 
     b.onNext(b3); 
     a.onNext(a5); 

     a.onNext(a6); 
     b.onNext(b4); 

     a.onComplete(); b.onComplete(); 

     Flowable.zip(a, b, new BiFunction<String, String, AB>() { 
      @Override public AB apply(String t1, String t2) throws Exception { 
       return new AB(t1, t2); 
      } 
     }) 
     .test() 
     .assertResult(
       new AB(a1, b1), 
       new AB(a2, b2), 
       new AB(a3, b2), 
       new AB(a4, b3), 
       new AB(a5, b3), 
       new AB(a6, b4)); 
    } 
} 

答えて

1

あなたの例から、ペアリングを駆動するものを条件明確ではありません。

EDITは、私がテストケースを作りました。

私たちはペアは、両方のソースとすぐに形成されzipを使用

は、次の項目があります。

(A1, B1), (A2, B2), (A3, B3), (A4, B4) 

任意のソース内の次の項目がある場合、我々はペアが形成されてcombineLatestを使用する場合:

(A1, B1), (A2, B1), (A2, B2), (A3, B2), (A4, B2), (A4, B3), (A5, B3), (A6, B3), (A6, B4) 
は、

私の場合は、ソースを組み合わせるが、すべてのペアを取ることはないようです。ビジネスルールに応じて余分なペアを削除することができます。

余分なルールは時間であり、次にsamplingを見てください。

Flowable.combineLatest(a, b, (t1, t2) -> new AB(t1, t2)) 
     .throttleLast(1, TimeUnit.MICROSECONDS) 

それは余分な規則がBからソースAからの固有の値とかのう繰り返しを持つことであるなら、あなたはdistinct

Flowable.combineLatest(a, b, (t1, t2) -> new AB(t1, t2)) 
     .distinct(ab -> ab.a) 

(A1, B1), (A2, B1), (A3, B2), (A4, B2), (A5, B3), (A6, B3) 
を使用することができます組み合わせたペアからの時間間隔ごと

(A1, B1), (A4, B3), (A5, B3), (A6, B3), (A6, B4) 

を最新の結果がかかります

+0

combineLatestは私の状況にとっては便利だと思われますが、これまで私は必要なものを作り出すために苦労しました。 1つの問題は、テストケースでシーケンスを制御することです。私はzipで作業し、最初のものの後にBsを倍増させましたが、最終的なAtomicBoolean reset = new AtomicBoolean(true);新しいAB(t1、t2))、zip(a、b.flatMap(t-> reset.getAndSet(false)?Flowable.just(t):Flowable.just ; おそらくこれは悪いですか? – Dan

+0

ああ、最初のものを除いて、Bのすべての要素を繰り返すことを理解していませんでした。そうすればうまくいくはずです。 –

+0

Bsを複製することは目的ではありませんでしたが、圧縮されたときに正しい順序(最初と最後のものは無視します)を生成することになりました。しかし、私の問題はこの例よりも精巧ですが、combineLatestはそのシナリオで非常に有用であることが証明されています。ありがとう! – Dan

関連する問題