何らかの種類のストリーミングパーサーを実装しようとしています。私は整数のストリームを持っていて、ストリームの一部を集約する新しいオブジェクトを作成するためにそれらを結合します。RxJavaを使用して、オブザーバブルオブジェクトから新しいオブジェクトに複数のアイテムを結合します。
たとえば、整数が負の場合、オブジェクトは「完了」です。簡単にするために、生成されたアイテムは数字の文字列になります。ここで
は簡単な例です:
Source: 1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11
Output: "1-2", "343-5", "6-7", "8910-11"
私は、既存事業者の適切な組み合わせを見つけることができません。私は彼らにいくつかの位置の演算子を使用するまで、それはテイク(N)またはのように、うまく機能
rx.Observable<Integer> src = rx.Observable
.from(Arrays.asList(1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11));
rx.Observable<String> res = src
.lift(new rx.Observable.Operator<String, Integer>() {
@Override
public rx.Subscriber<? super Integer> call(
final rx.Subscriber<? super String> subscriber) {
return new rx.Subscriber<Integer>(subscriber) {
private final StringBuilder cur = new StringBuilder();
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
@Override
public void onNext(Integer i) {
if (subscriber.isUnsubscribed())
return;
cur.append(i).append('');
if (i < 0) {
subscriber.onNext(cur.toString());
cur.delete(0, cur.length());
}
}
};
}
});
rx.observers.TestSubscriber<String> sub =
new rx.observers.TestSubscriber<>();
res.subscribe(sub);
List<String> l1 = sub.getOnNextEvents();
:私のソリューションは、「チャンク」は有効であり、新しいオブジェクトを発するまでのデータを集計したカスタムオペレータを、提供することでしたcombineLatest。この状況で
rx.Observable<String> res2 = res.take(2);
私ははかのonErrorをonCompleted取得しないと、項目の少ない量が放出されます。
次のように入力すると、の次にはが出力され、にはが出力されます。
アイデア?