Observables + Couchbase async apiから脳爆発が起こったと思います。 誰かが私を助けてくれますか? バッチ操作で数日間戦ってきましたが、適切なエラー処理でバッチ操作を行う方法を理解できません。Couchbase非同期バッチエラー処理
たとえば、Couchbaseの一部のドキュメントを一括して更新したいとします。私は同期APIを使用し 場合、それは次のようになります。
List< JsonDocument> items = getItems(1, 2, 3, 4, 5);
// getItems - some method which calls bucket.get() for specified keys
for (JsonDocument item : items) {
try {
try {
item.content().put("age", 42);
bucket.replace(item);
} catch (CASMismatchException e) {
// retry
bucket.get(item.id()).content().put("age", 42);
bucket.replace(item);
}
} catch (Exception e) {
// handle error which doesn't stop execution for other items
// for example, add item id to list of failed items in response
errorHandler.handleError(item.id(), e);
}
}
しかし、これは平行ではなく、およびドキュメンテーションは、非同期APIがより効率的であると言います。 私が理解することはできませんが観測を通じて、このような流れを作成する方法ですが、私は試してみました:
Observable.from(items)
.flatMap(item -> {
item.content().put("age", 42);
return bucket.async().replace(item);
})
.onErrorResumeNext(error -> {
// what to do? return another observable which does retry logic above?
// how do I know what item has failed?
// I don't have ID of that item, nor I can extract it from passed Exception
// why onErrorResumeNext is getting called only once (if one item fails)
// and is not called for other items?
})
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log)
すべてのヘルプははるかに高く評価されます! おかげ
Observableを使ってObservableをビルドする必要があると思います。試しにキャッチしてから再試行してください。再試行してもエラーが出ない場合は、このアイテムを出してください。 –