2017-04-02 16 views
4

今私はObservable<Observable<Integer>を保持していますが、どのようにしてそれをn-aryデカルト積を含むObservable<int[]>に転送できますか?例えば独立したデカルト積のinRxJava

:非同期の方法で、デカルト積を作成

Observable<Observable<Integer> ob = Observable.just(
    Observable.just(0,1), 
    Observable.just(2,3), 
    Observable.just(4,5) 
); 
ob...... -> (0,2,4), (0,3,4), (0,2,5), (0,3,5), (1,2,4), (1,3,4), (1,2,5), (1,3,5) 

答えて

1

まず、あなたが入力Observable秒の固定番号が必要(toArray方法はObservable<T>T[]に変換します)。第2に、ブロッキングの必要はありませんが、第2、第3などが複数回消費される必要があるため、キャッシングの必要性があります。

import java.util.*; 

import io.reactivex.Observable; 

public class Cartesian { 

    static Observable<int[]> cartesian(Observable<Observable<Integer>> sources) { 
     return sources.toList().flatMapObservable(list -> cartesian(list)); 
    } 

    static Observable<int[]> cartesian(List<Observable<Integer>> sources) { 
     if (sources.size() == 0) { 
      return Observable.<int[]>empty(); 
     } 
     Observable<int[]> main = sources.get(0).map(v -> new int[] { v }); 

     for (int i = 1; i < sources.size(); i++) { 
      int j = i; 
      Observable<Integer> o = sources.get(i).cache(); 
      main = main.flatMap(v -> { 
       return o.map(w -> { 
        int[] arr = Arrays.copyOf(v, j + 1); 
        arr[j] = w; 
        return arr; 
       }); 
      }); 
     } 

     return main; 
    } 

    public static void main(String[] args) { 
     cartesian(Observable.just(
      Observable.just(0, 1), 
      Observable.just(2, 3), 
      Observable.just(4, 5) 
     )) 
     .subscribe(v -> System.out.println(Arrays.toString(v))); 
    } 
} 
0

が難しいまたは不可能であるいくつかの意味です。ブロッキングがOKであれば、あなたはそれをブロックしないようにするには、このコードを向上させることが可能ですが、あなたはまだすべてのObservable sからすべての結果をキャッシュする必要がありますと、コードがはるかに複雑になります。この

public class Main 
{ 

    static class ProductIterator<T> implements Iterator<T[]> 
    { 
     private final List<List<T>> componentsList; 
     private final Class<T> componentClass; 
     private final int[] indices; 
     private boolean hasNext; 

     public ProductIterator(List<List<T>> componentsList, Class<T> componentClass) 
     { 
      this.componentsList = componentsList; 
      this.componentClass = componentClass; 
      this.indices = new int[componentsList.size()]; 
      this.hasNext = this.indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size(); 
     } 

     @Override 
     public boolean hasNext() 
     { 
      return hasNext; 
     } 

     @Override 
     public T[] next() 
     { 
      T[] res = (T[]) Array.newInstance(componentClass, componentsList.size()); 
      for (int i = 0; i < componentsList.size(); i++) 
      { 
       res[i] = componentsList.get(i).get(indices[i]); 
      } 

      // move next 
      indices[0]++; 
      for (int i = 0; i < componentsList.size() - 1; i++) 
      { 
       if (indices[i] == componentsList.get(i).size()) 
       { 
        indices[i] = 0; 
        indices[i + 1]++; 
       } 
      } 
      hasNext = indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size(); 

      return res; 
     } 
    } 

    public static <T> Observable<T[]> product(Observable<Observable<T>> components, Class<T> componentClass) 
    { 
     return Observable.fromIterable(new Iterable<T[]>() 
     { 
      @Override 
      public Iterator<T[]> iterator() 
      { 
       // postpone blocking up until iterator is requested 
       // and by this point we can't postpone anymore 
       Single<List<List<T>>> componentsList = components.map(o -> o.toList().blockingGet()).toList(); 
       return new ProductIterator<T>(componentsList.blockingGet(), componentClass); 
      } 
     }); 
    } 

    public static void main(String[] args) throws Exception 
    { 
     Observable<Observable<Integer>> ob = Observable.just(
       Observable.just(0, 1), 
       Observable.just(2, 3), 
       Observable.just(4, 5) 
     ); 

     Observable<Integer[]> product = product(ob, Integer.class); 
     product.forEach(a -> System.out.println(Arrays.toString(a))); 
    } 
} 

ような何かを行うことができます。おそらくブロックはあなたにとって受け入れられないでしょう、とにかくデカルトの製品は悪い考えです。

0

さて、私はそれを自分で解決できます。しかし、もっとエレガントな方法がありますか?
すべての

Observable<int[]> toObservableArray(Observable<Observable<Integer>> obs) { 
     List<int[]> list = obs.map(ob -> toArray(ob)).toList().toBlocking().last(); 
     return Observable.create(new SyncOnSubscribe<int[], int[]>() { 
      @Override 
      protected int[] generateState() { 
       int[] array = new int[list.size()]; 
       Arrays.fill(array, 0); 
       return array; 
      } 

      @Override 
      protected int[] next(int[] state, Observer<? super int[]> observer) { 
       int[] next = new int[list.size()]; 
       for (int i = 0; i < next.length; i++) { 
        next[i] = list.get(i)[state[i]]; 
       } 
       observer.onNext(next); 
       state[state.length - 1]++; 
       for (int i = state.length - 1; i >= 0; i--) { 
        int delta = list.get(i).length - state[i]; 
        if (delta > 0) { 
         break; 
        } else if (delta == 0) { 
         state[i] = 0; 
         if (i == 0) { 
          observer.onCompleted(); 
          break; 
         } 
         state[i - 1]++; 
        } 
       } 
       return state; 
      } 
     }); 
    }