2017-04-12 23 views
0

Observables + Couchbase async apiから脳爆発が起こったと思います。 誰かが私を助けてくれますか? バッチ操作で数日間戦ってきましたが、適切なエラー処理でバッチ操作を行う方法を理解できません。Couchbase非同期バッチエラー処理

たとえば、Couchbaseの一部のドキュメントを一括して更新したいとします。私は同期APIを使用し 場合、それは次のようになります。

List< JsonDocument> items = getItems(1, 2, 3, 4, 5); 
// getItems - some method which calls bucket.get() for specified keys 

for (JsonDocument item : items) { 
    try { 
     try { 
     item.content().put("age", 42); 
     bucket.replace(item); 
     } catch (CASMismatchException e) { 
     // retry 
     bucket.get(item.id()).content().put("age", 42); 
     bucket.replace(item); 
     } 
    } catch (Exception e) { 
     // handle error which doesn't stop execution for other items 
     // for example, add item id to list of failed items in response 
     errorHandler.handleError(item.id(), e); 
    } 
} 

しかし、これは平行ではなく、およびドキュメンテーションは、非同期APIがより効率的であると言います。 私が理解することはできませんが観測を通じて、このような流れを作成する方法ですが、私は試してみました:

Observable.from(items) 
.flatMap(item -> { 
    item.content().put("age", 42); 
    return bucket.async().replace(item); 
}) 
.onErrorResumeNext(error -> { 
    // what to do? return another observable which does retry logic above? 
    // how do I know what item has failed? 
    // I don't have ID of that item, nor I can extract it from passed Exception 
    // why onErrorResumeNext is getting called only once (if one item fails) 
    // and is not called for other items? 
}) 
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log) 

すべてのヘルプははるかに高く評価されます! おかげ

+1

Observableを使ってObservableをビルドする必要があると思います。試しにキャッチしてから再試行してください。再試行してもエラーが出ない場合は、このアイテムを出してください。 –

答えて

0

あなたはこのような何かを行うことができます。

Observable.from(items) 
      .flatMap(item -> { 
       item.content().put("age", 42); 
       return bucket.async() 
         .replace(item) 
         .retry((count, throwable) -> count == 1 && throwable instanceof CASMismatchException) 
         .onErrorReturn(e -> { 
          errorHandler.handleError(item.id(), e); 
          return null; //or item, if you need the item further down the stream 
         }) 
         .subscribeOn(Schedulers.io()); //not sure if it's needed with bucket.async() 
      }) 
      .subscribeOn(<something>) //with this scheduler the put() method will be executed 
      .subscribe(); 

アイデアは、各再試行ロジックがないストリーム全体のために、単一の項目のためであるとして、flatMap()を経由して別の観察可能に取り扱い、各項目を分離することです。 リトライ演算子はリトライ回数と例外を与える述語で動作しますので、最初に特定のCASMismatchException例外を指定して再試行し、エラーの場合はonErrorReturnを実行して他のエラーを処理しても返すことができますそのアイテムを処理し続ける場合
注意すべき点はスケジューラです。コールを実行するときにCouchbaseがデフォルトでio()に動作するかどうかはわかりません。また、このラインと考えて:

item.content().put("age", 42); 

は、それがメインストリームのサブスクリプション・スケジューラ上で行われるよう、最後subscribeOn()上で実行されます。

+0

あなたの助けてくれてありがとう!私はあなたの助言に従おうとします。私が説明したケースは非常に単純化されていますが、ObservableのリストではなくObservableというアイテムでエラーをキャッチするというアイデアは、私が本当に逃したものです。 – blackdigger