未来はJavaScriptの約束と全く同じです。 Future<T>
を返すメソッドを作成し、このメソッドで非同期呼び出しを行います。あなたの非同期が完了すると、ちょうどRXは、ストリーム上で動作しますfuture.complete(<T>)
Future<JsonObject> getUsers(){
Future<JsonObject> future = Future.future();
client.getConnection(res -> {
SQLConnection connection = res.result();
connection.query("SELECT count(1) FROM T_User", res2 -> {
connection.query("SELECT count(1) FROM T_Book", res3 -> {
JsonObject rows = new JsonObject().put("res2",res2.result().getRows()).put("res2",res3.result().getRows());
future.complete(rows);
// In exception block, you can add
// future.fail(Throwable);
});
});
});
return future;
}
を呼び出します。だから、RX上記のコードでは、なります:
Observable<JsonObject> getUsers(){
return Observable.create(subscriber -> {
client.getConnection(res -> {
SQLConnection connection = res.result();
connection.query("SELECT count(1) FROM T_User", res2 -> {
connection.query("SELECT count(1) FROM T_Book", res3 -> {
JsonObject rows = new JsonObject().put("res2",res2.result().getRows()).put("res2",res3.result().getRows());
subscriber.onNext(rows);
subscriber.onCompleted();
});
});
});
}, Emitter.BackpressureMode.NONE);
}
RXのAPI
Observable<Long> userCount = mongoClient.rxCount("users", query).toObservable();
Observable<Long> booksCount = mongoClient.rxCount("books", query).toObservable();
Observable
.zip(userCount, booksCount, (userRes, booksRes) ->
new JsonObject().put("user",userRes).put("books",booksRes)
)
.subscribe(objects -> {
System.out.println(objects.getString("user"));
System.out.println(objects.getString("books"));
});
で、私はmongoClient
を使用しているが、あなたはsqlClient
と同じことを行うことができます。
_resolveコールバックが意味することは私には分かりません。 – tsegismont
@tsegismontフラットメソッド呼び出し – twogoods