2016-10-20 4 views
2

Observerは、サブスクライバに関係なく、論理が一度(非同期に)実行できる方法を教えてください。私のユースケースは、別のオブジェクト(SqliteOpenHelper)に委譲してModelクラスのCRUDを実行するProviderクラスです。 ModelProvider.updateへの呼び出しは、UIが購読しているかどうかにかかわらず、モデルの変更を維持する必要があります。オプションの加入者で1回実行可否

class ModelProvider { 
    Observable<String> updateModel(Model model) { 
     return Observable.create((Subscriber<? super String> subscriber) -> { 
      // synchronous for now, probably irrelevant 
      String modelId = repository.save(model); 
      subscriber.onNext(modelId); 
     }).subscribeOn(Schedulers.io()); 
    } 
} 

class SomeActivity { 
    void updateModel(Model model) { 
     // dont care about the result, but need it to execute exactly once 
     mModelProvider.updateModel(model); 
    } 
} 

私はrepository.save(model)への呼び出しが同期であるのでRxJavaと反応性パターンを使用してコードベースの移動の途中でだけど、まだUIの加入の有無にかかわらず、その後一度だけ実行するロジックが必要になります。

私は単純に、観察のうち、リポジトリのロジックを移動し、その結果を使用して、異なる観測可能を返すと考えてきました:

class ModelProvider { 
    Observable<String> updateModel(Model model) { 
     String modelId = repository.save(model); 

     return Observable.create((Subscriber<? super String> subscriber) -> { 
      subscriber.onNext(modelId); 
     }).subscribeOn(Schedulers.io()); 
    } 
} 

が、これは、メインスレッドをブロック、それにIOの仕事を強制的に。ここ

別の非ソリューション:UIがそれに加入している場合、これが再びobs.subscribe()に一度実行しますが実行さん

Observable<String> updateModel(Model model) { 
     Observable<String> obs = Observable.create((Subscriber<? super String> subscriber) -> { 
     String modelId = repository.save(model); 
      subscriber.onNext(modelId); 
     }).subscribeOn(Schedulers.io()); 

     obs.subscribe(); 

     return obs; 
    } 

私はちょうど反応パターンを学習していますが、これはSubjects/Relaysが有用であると仮定しています。それは、再起動防止を作成後

Observable<String> updateModel(Model model) { 
    Observable<String> obs = Observable.create((Subscriber<? super String> subscriber) -> { 
     String modelId = repository.save(model); 
     subscriber.onNext(modelId); 
    }).subscribeOn(Schedulers.io()) 

    // cached() returns a new observer over the original, which only executes once 
    // but not until it is subscribed to 
    Observable<String> cachedObs = obs.cache(); 
    // this "starts" execution of `repository.save()` 
    cachedObs.subscribe(); 

    // return observable that executes once and caches (cachedObs) 
    // returning the original (obs) would cause multiple executions 
    return cachedObs; 
} 

Observable.cache()を呼び出しますが、obs.subscribe()への呼び出しは、その1-と専用1呼び出しを取得するために必要とされている:

答えて

1

これは動作します。

サブスクリプションで永続性ブロック(repository.save()など)がすぐに開始されるのではなく、未定義の時点で別のスレッド(Schedulers.ioスレッドプール)で非同期に実行されるようにキューに入れられます。


これはrxjava-async-utilsパッケージを使用しても動作します:cache()への呼び出しは、結果を保存して、加入者にそれを提供するために必要であることを

Observable<String> updateModel(Model model) { 
    Observable<String> obs = Async.start(() -> repository.save(model)); 

    // cached() here is required in order to avoid the race condition 
    Observable<String> cachedObs = obs.cache(); 

    return cachedObs; 
} 

注意を。それがなければ、Async.startブロックが既に完了しているため、加入者に結果が得られない競合状態が存在します。

1

@xstは言ったが、私はそれを簡素化したいとより:

return Observable 
    .defer(() -> Observable.just(repository.save(model))) 
    .subscribeOn(Schedulers.io()) 
    .cache(); 
+0

これは応答をキャッシュしない(および再実行されません)が、ない加入者が存在しない場合は実行されません。 – xst

関連する問題