RxJavaのバックプレッシャは実際のバックプレッシャではなく、いくつかの要素セットを無視するだけです。RxJavaでのREALバックプレッシャの最適実装
しかし、私は要素を緩めることができないと私は何とかエミションを遅くする必要がありますか?
RxJavaは要素のエミションには影響しません。したがって、開発者はそれを単独で実装する必要があります。しかしどうですか?
最も簡単な方法は、エミションをインクリメントして仕上げ時にデクリメントするカウンタを使用することです。そのような
:
public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.create(s -> {
while (!s.isUnsubscribed()) {
if (counter.get() < 100) {
s.onNext(Math.random());
counter.incrementAndGet();
} else {
sleep(100);
}
}
}).subscribeOn(sA)
.flatMap(r ->
Observable.just(r)
.subscribeOn(sB)
.doOnNext(x -> sleep(1000))
.doOnNext(x -> counter.decrementAndGet())
)
.subscribe();
}
しかし、私はこの方法は非常に貧弱であると思います。もっと良い解決策はありますか?
キューが背圧ない:RxJavaは合併症なくて背圧対応のフローを与えるための方法がたくさんあります。バックプレッシャのポイントは、排出量を減速させるために生産者に合図することです。それは常に可能ではないかもしれませんが、大多数の場合、そうです。このため、明示的なバックプレッシャはJDK9の新しいリアクティブストリーム仕様に含まれています。 https://github.com/reactive-streams/reactive-streams-jvm – kaqqao