2016-04-14 2 views
0

何らかの種類のストリーミングパーサーを実装しようとしています。私は整数のストリームを持っていて、ストリームの一部を集約する新しいオブジェクトを作成するためにそれらを結合します。RxJavaを使用して、オブザーバブルオブジェクトから新しいオブジェクトに複数のアイテムを結合します。

たとえば、整数が負の場合、オブジェクトは「完了」です。簡単にするために、生成されたアイテムは数字の文字列になります。ここで

は簡単な例です:

Source: 1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11 
Output: "1-2", "343-5", "6-7", "8910-11" 

私は、既存事業者の適切な組み合わせを見つけることができません。私は彼らにいくつかの位置の演算子を使用するまで、それはテイク(N)またはのように、うまく機能

rx.Observable<Integer> src = rx.Observable 
    .from(Arrays.asList(1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11)); 

rx.Observable<String> res = src 
    .lift(new rx.Observable.Operator<String, Integer>() { 

     @Override 
     public rx.Subscriber<? super Integer> call(
      final rx.Subscriber<? super String> subscriber) { 

      return new rx.Subscriber<Integer>(subscriber) { 

       private final StringBuilder cur = new StringBuilder(); 

       @Override 
       public void onCompleted() { 
        if (!subscriber.isUnsubscribed()) { 
         subscriber.onCompleted(); 
        } 
       } 

       @Override 
       public void onError(Throwable e) { 
        if (!subscriber.isUnsubscribed()) { 
         subscriber.onError(e); 
        } 
       } 

       @Override 
       public void onNext(Integer i) { 
        if (subscriber.isUnsubscribed()) 
         return; 

        cur.append(i).append(''); 
        if (i < 0) { 
         subscriber.onNext(cur.toString()); 
         cur.delete(0, cur.length()); 
        } 
       } 
      }; 
     } 
    }); 

rx.observers.TestSubscriber<String> sub = 
    new rx.observers.TestSubscriber<>(); 
res.subscribe(sub); 
List<String> l1 = sub.getOnNextEvents(); 

:私のソリューションは、「チャンク」は有効であり、新しいオブジェクトを発するまでのデータを集計したカスタムオペレータを、提供することでしたcombineLatest。この状況で

rx.Observable<String> res2 = res.take(2); 

私はのonErrorをonCompleted取得しないと、項目の少ない量が放出されます。

次のように入力すると、の次にはが出力され、にはが出力されます。

アイデア?

答えて

0

Hmm。長いポストを書くことは新しいアイデアを生み出すのに役立ち、それは機能します。 (私は2日前に壁に向かった)。

Observer<Observer<String>>を生成し、オブジェクトが準備完了になるまでObserver.empty()を発行します。 flatMapの後に。ここで

は実施例である:

rx.Observable<rx.Observable<String>> res = src 
    .lift(new rx.Observable.Operator<rx.Observable<String>, Integer>() { 

     @Override 
     public rx.Subscriber<? super Integer> call(
      final rx.Subscriber<? super rx.Observable<String>> subscriber) { 

      return new rx.Subscriber<Integer>(subscriber) { 

       private final StringBuilder cur = new StringBuilder(); 

       // onError, onCompleted skipped 

       @Override 
       public void onNext(Integer i) { 
        rx.Observable<String> out; 
        cur.append(i); 
        if (i < 0) { 
         out = rx.Observable.just(cur.toString()); 
         cur.delete(0, cur.length()); 
        } else { 
         out = rx.Observable.<String>empty(); 
        } 
        subscriber.onNext(out); 
       } 
      }; 
     } 
    }); 

rx.Observable<String> res2 = res 
    .flatMap(stringObservable -> stringObservable) 
    .take(2);