2017-08-07 4 views
0

io.reactivex.ObservableEmitter<T>からio.reactivex.Observer<T>への簡単な変換方法はありますか?私はrx-java2ライブラリでそれを行う機能を見つけることができませんでした。rx-java2の `ObservableEmitter`から` Observer`への変換

実装は自明であると思われる:

public static <T> Observer<T> toObserver(ObservableEmitter<T> oe) { 
    return new Observer<T>() { 

     @Override 
     public void onSubscribe(Disposable d) { 
      oe.setDisposable(d); 
     } 

     @Override 
     public void onNext(T t) { 
      oe.onNext(t); 
     } 

     @Override 
     public void onError(Throwable e) { 
      oe.onError(e); 
     } 

     @Override 
     public void onComplete() { 
      oe.onComplete(); 
     } 
    }; 
} 

それはrx-java2における2つのコアタイプの間の変換を提供するように、それは、それが標準ライブラリ実装の一部であることを感じています。

基本的にI 2 OnSubscribeがrxjava 2で、私は2つの他のObservable'sにサブスクライブするために使用できるSubscriberインターフェースを提供すること

class X<T, O1, O2> implements Transformer<T, Either<O1, O2>> { 

Transformer<T, O1> t1; 
Transformer<T, O2> t2; 

@Override 
public Observable<Either<O1, O2>> call(Observable<T> input) { 
    return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() { 
     @Override 
     public Observable<Either<O1, O2>> call(final T t) { 
      return Observable.<Either<O1, O2>>create(new OnSubscribe<Either<O1, O2>>() { 
       @Override 
       public void call(final Subscriber<? super Either<O1, O2>> sub) { 
        t1.call(Observable.just(t)).map(o1 -> Either.<O1, O2>left(o1)).subscribe(sub); 
        t2.call(Observable.just(t)).map(o2 -> Either.<O1, O2>right(o2)).subscribe(sub); 
       } 
      }); 
     } 
    }); 
} 

}

Noticeをrxjava 1から次のコードを移行しようとしてい変換が必要です。

+1

なぜだろうので、標準ライブラリにはそのようなことはありませんあなたもこれをする?あなたのエミッタを 'Observable'に「購読」したいのであれば、実際には' Observable'に直接購読するべきです。サブスクリプションでObservableを作成する場合は、 'create'の代わりに' defer() 'を使います。 – akarnokd

+0

私は2つの変圧器 'ObservableTransformer t1'と' ObservableTransformer t2'を持っており、それらをObservableTransformer >に組み合わせたいと思います。それぞれの 'T'に対して' Observable.create'を呼び出す関数を持つ結果トランスフォーマーの 'flatMap'のapplyメソッドです。 'Observable.create'は' ObservableOnSubscribe'を 'ObservableEmitter'と取り、' t1'と 't2'の結果にそのエミッタをサブスクライブしたいと思います。 rxjava 1では 'subscribe'は同じ' Subscriber'インタフェースを受け入れます。 – hgrey

+0

はい、上記のコードを延期して単純化することができます...ありがとう@akarnokd – hgrey

答えて

1

publish(Function)のように見えます。 (あなたのコードは畳み込まれ、v1のプロトコルに違反していました)。

ObservableTransformer<T, O1> t1 = ... 
ObservableTransformer<T, O2> t2 = ... 

ObservableTransformer<T, Either<O1, O2>> combiner = o -> 
    o.publish(g -> Observable.merge(
     t1.apply(g).map(o1 -> Either.<O1, O2>left(o1)), 
     t2.apply(g).map(o2 -> Either.<O1, O2>right(o2)) 
    )); 

あなたは本当に代わりにcreatemerge()を使用し、(場合にインナーは非同期に行く)外側flatMapに固執する場合:

return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() { 
    @Override 
    public Observable<Either<O1, O2>> call(final T t) { 
     Observable<T> just = Observable.just(t); 
     Observable.merge(
      t1.call(just).map(o1 -> Either.<O1, O2>left(o1)), 
      t2.call(just).map(o2 -> Either.<O1, O2>right(o2)) 
     ) 
    } 
}); 
+0

はい、 'publish(Function)'が必要でした。そして、ええ、コードが巻き込まれました。あなたの答え 'Func1'に対するマイナーな修正はもはや存在せず、' Observable.wrap'呼び出しは 't1.apply(g)'に必要です。 'Observable.just'で観測可能なものを作成し、それをidentity関数で' flatMap'することも可能ですが、 'merge'がこれに適していますか? – hgrey

+0

おっと、 'merge'のソースコードを見て、これは' fromArray'とそれに続く 'flatMap'でどのように実装されているのですか? – hgrey

関連する問題