2017-08-16 8 views
0

RxJavaで消費者/プロデューサを開発しました。次の記事の記事rxjava2 producer-consumer exampleに基づいています。すべてのユーザがRxJava2でアイテムを受信したときにイベントをトリガする方法は?

しかし私のユースケースは少し複雑です。

私は(加入者とそのフィルターの量は、実行時に変化します)フィルタのいくつかの並べ替えで、それぞれ1つずつ、複数の加入者を使用することがありますし、加入者のいずれかによって処理されることはありませんいくつかのitensがあるでしょう。

私はいくつかの後処理を行うために残っているアイテム(サブスクライバのいずれにも適格ではない)をすべて取得する必要があります。

私はプロデューサーにメソッド「onAfterNextを()」を使用しようとしたが、THS方法は、アイテムが加入者

によって処理される前にそれを行うための方法はあります実行しますか?

答えて

0

私のリスナーを追加するにはPublishSubjectクラスを使用してこれを解決しました。

PublishSubject<SQSMessage<String>> ps = PublishSubject.create() 
ps.publish(); 
ps.subscribe(m -> System.err.println(m)); 

flowable.parallel() 
    .runOn(Schedulers.io()) 
    .map(item -> { 
     // This map sends the object to my listeners on PublishSubject 
     ps.onNext(item); 
     return item; 
    }) 
    .sequential() 
    // This map does the post processing after 
    // the object was sent to the listeners 
    .map(s -> {//do post process stuff}) 
    .subscribe(); 
関連する問題