私はイベントを送信するシナリオをテストしていますが、消費者がフローを続けるために処理を完了したことを監視しています。 rxJavaを使用して、消費者処理の終わりまでブロックされるメインスレッドObservable、私は観察可能な結果を待つようにメインスレッドをロックすることに成功しませんでした。Observableが完了するまでスレッドをロックする方法
私のプロデューサー
@Service
public class Producer {
private MessageChannel output;
@Autowired
private Consumer consumer;
@Autowired
public Producer(Processor processor) {
this.output = processor.output();
}
public void send(String event) {
System.out.println("SENDING EVENT...");
output.send(MessageBuilder.withPayload(event).build());
//Observable<Boolean> obs = consumer.execute();
//obs.subscribe();
//Blocking process
BlockingObservable.from(consumer.execute()).subscribe();
//Continue to flow
System.out.println("EVENT PROCESSED...");
}
}
マイ消費者
@Service
public class Consumer {
@StreamListener(target = Processor.INPUT)
public void receiver(@Payload String event){
System.out.println("EVENT RECEIVED, PROCESSING...");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
execute();
}
public Observable<Boolean> execute() {
return Observable.<Boolean>create(emitter -> {
try {
System.out.println("EVENT STILL PROCESSING...");
emitter.onNext(Boolean.TRUE);
} catch (Exception e) {
emitter.onError(new RuntimeException("ERROR"));
}
emitter.onCompleted();
});
}
}