-1
rxjavaで境界線が開いていて、clossingしているバッファで作業しようとしていますが、うまく動作しません。 0,1,2,3,0,1,3,0,3私は{0,1,2,3}、{0,1}で終わりたいと思っています。 、3}、{0,3}である。rxjavaで境界線が開いていて、clossingしているバッファ
これは私がこれまでしているコードです:
PublishSubject openning = PublishSubject.create();
openning.doOnNext(new Consumer() {
@Override
public void accept(@NonNull Object o) throws Exception {
if(o.equals("0"))
openning.onNext(o);
}
});
Observable<String> observableA = Observable.interval(1, TimeUnit.SECONDS).map(value -> String.valueOf(value % 10));
observableA.subscribe(openning);
// TODO: Buffer by boundary
observableA = observableA.buffer(openning, new Function<String, Observable<List<String>>>() {
@Override
public Observable<List<String>> apply(@NonNull String o) throws Exception {
list.add(o);
if (o.equals("0")) {
return Observable.just(list);
} else {
list.add(o);
sb.append(o);
return Observable.never();
}
}
}, new Callable() {
@Override
public Object call() throws Exception {
return list;
}
});
すべてのヘルプは、あなたはバッファが3に遭遇した後に再起動する必要があることを意味している場合、のextensions projectでbufferUntil
オペレータを見
ありがとうございます。私が望まないシーケンス間にデータがあるかもしれないという事実を忘れました。しかし、あなたの提案のおかげで、私はbufferWhileでそれを行うことができることを発見しました。 –