私は研究しており、彼らはあまり理解できません。 私が達成しようとしているのは以下の機能です:スプリング式原子炉プロジェクトを使用した反応性流れの背圧
私はSpring Reactorプロジェクトを使用していて、eventBusを使用しています。私のイベントバスがイベントをモジュールAに投げています。
モジュールAは、イベントを受信し、一意の値を保持するホットストリームに挿入する必要があります。 250ミリリスごとに、ストリームはすべての価値を引き出し、それらのストリームを評価しなければなりません。例えば
: eventBus番号でイベントを投げている:1,2,3,2,3,2
ストリームは、一意の値を取得し、保持する必要があります - > 1,2,3- 250ミリ秒後ストリームは数値と空の値を出力する必要があります
誰もが開始方法を知っていますか?私は例を試しましたが、何も実際に動作しないと私は何かを理解していないと思います。誰にも例がありますか?
TNX
EDIT:
Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS);
s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));
for (int i = 0; i < 10000; i++) {
p.onNext(i);
}
例外:
java.lang.IllegalStateException: The environment has not been initialized yet
at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?]
あなたは、私が進めることができません見ることができるように
私は常に例外を取得次をやろうとしていますこの問題を過ぎずに試してみてください。
これは、私がbuffer(1, TimeUnit.SECONDS)
を使用している場合にのみ発生します。たとえば、buffer(50)
を使用すると、正常に動作します。これは最終的な解決策ではありません。