私はリアクターライブラリーを試していましたが、onNextまたはonComplete呼び出しで戻ってくるモノが決して戻ってこない理由は分かりません。私は非常に些細な事を欠いていると思う。ここにサンプルコードがあります。購読中のモノでonNextとonCompleteコールを受け取ることができません
MyServiceService service = new MyServiceService();
service.save("id")
.map(myUserMono -> new MyUser(myUserMono.getName().toUpperCase(), myUserMono.getId().toUpperCase()))
.subscribe(new Subscriber<MyUser>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("Subscribed!" + Thread.currentThread().getName());
}
@Override
public void onNext(MyUser myUser) {
System.out.println("OnNext on thread " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable t) {
System.out.println("onError!" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onCompleted!" + Thread.currentThread().getName());
}
});
}
private static class MyServiceService {
private Repository myRepo = new Repository();
public Mono<MyUser> save(String userId) {
return myRepo.save(userId);
}
}
private static class Repository {
public Mono<MyUser> save(String userId) {
return Mono.create(myUserMonoSink -> {
Future<MyUser> submit = exe.submit(() -> this.blockingMethod(userId));
ListenableFuture<MyUser> myUserListenableFuture = JdkFutureAdapters.listenInPoolThread(submit);
Futures.addCallback(myUserListenableFuture, new FutureCallback<MyUser>() {
@Override
public void onSuccess(MyUser result) {
myUserMonoSink.success(result);
}
@Override
public void onFailure(Throwable t) {
myUserMonoSink.error(t);
}
});
});
}
private MyUser blockingMethod(String userId) throws InterruptedException {
Thread.sleep(5000);
return new MyUser("blocking", userId);
}
}
上記のコードのみがSubcribed!mainを出力します。その将来のコールバックがmyUserMonoSink.success