2016-12-28 3 views
0

私はRxJavaとconcat()first()演算子を使用しています:使用CONCAT()と最初の()RxJavaでキャッシュを実装する - 同時呼び出し

public Observable<List<Entity>> getEntities() { 
    invalidateCacheIfNeeded(); 
    return Observable 
      .concat(cachedEntities(), networkEntities()) 
      .first(); 
} 

cachedEntitiesをキャッシュされたリストの中から構築された観察可能を返しますnetworkEntitiesメソッドはRetrofitでエンティティを取得します。

これは、2人のユーザーがgetEntities()によって返されたオブザーバブルにすばやく登録しない限りうまくいきます。私は、最初の購読のネットワーク要求は、2番目の購読が行われたときに終了していないと思います。この場合、2つのネットワーク要求が実行されます。私は避けたいものです。

最初の呼び出しが終わったが、運にあるときに、第2のコールの実行のみが行われるので、私はシングルスレッドスケジューラを作成しようとしました:

mSingleThreadScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 

と:

public Observable<List<Entity>> getEntities() { 
    invalidateCacheIfNeeded(); 
    return Observable 
      .concat(cachedEntities(), networkEntities()) 
      .subscribeOn(mSingleThreadScheduler) 
      .first(); 
} 

ObservableチェーンでsubscribeOnコールを振り下ろそうとしましたが、同じ結果が得られました。

ヒント?

+0

のようになりますよう

public Observable<List<Entity>> getEntities() { invalidateCacheIfNeeded(); return Observable .concat(cachedEntities(), networkEntities()) .first(); } 

あなたはAsyncSubject<Data> mSubjectを作成し、それを使用する必要があります考えますあなたのメソッドの 'getEntities()'宣言で 'synchronized'キーワードを使用してください。スレッドロックで別のスレッドを許可する前に完了するように呼び出します。 –

+0

'concat()'呼び出しがブロックされていないので、 'getEntities'はほぼ即座に戻ります。この方法では 'synchronized'メソッドは動作しません。 – fstephany

+0

しかし、 'concat()'がブロックされていなくても、 'getEntities()'を呼んでいるものは呼び出しからの戻りを待っている間にブロックされますか?だから、もしあなたが複数回呼び出されないようにしようとしていたら、この呼び出しでは 'synchronized'がうまくいくと思います。私は何かが欠けていない限り。タスクが終了していない場合、どのようにすぐに戻ることができますか?たぶん私はここでおしゃべりをすることができます。 –

答えて

0

private Observable<List<Entity>> networkEntities() { 
    return mSubject 
      .map(Data::getEntities); 
} 

に従ってくださいそして、あなたのネットワーク呼び出しは、私はあなたが簡単に使用できると考えている。この

public Observable<Data> getDataFromNetwork() { 
    return networkOperation() 
      .subscribeOn(mSingleThreadScheduler) 
      .subscribe(mSubject); 
} 
1

スレッドセーフなメソッドを作るのは良い考えではないと思います。メソッド全体をブロックするため、パフォーマンスが低下します。そのため、データ構造をスレッドセーフにすることをお勧めします。あなたのケースで あなたはあなたの方法

public Observable<List<Entity>> getEntities() { 

}

使用CopyOnWriteArrayListと代わりの一覧一覧を使用しています。スレッドセーフです。

public Observable<CopyOnWriteArrayList<Entity>> getEntities() { 

}

それが動作します願っています。

+0

問題のサブスクライブ性を反映するために、メソッド呼び出しではなく質問を更新しました。 – fstephany

0

これは、複数の加入者が観察できる比較的一般的な使用例のようです。あなたは、深さの説明では、より ためthis questionへの回答を参照してください

public Observable<List<Entity>> getEntities() { 
    invalidateCacheIfNeeded(); 
    return Observable 
      .concat(cachedEntities(), networkEntities()) 
      .first() 
      .replay(1)    
} 

ようなものが必要。

+0

いいですね。同僚は 'replay()'とPublish/Subjectも述べました。私はそれを掘り下げます。 – fstephany

関連する問題