2016-03-11 10 views
20

私はをRetrofitサービスから作成し、 .getFoo()メソッドを呼び出した後、複数のサブスクライバと共有する必要があります。ただし、.share()メソッドを呼び出すと、ネットワークコールが再実行されます。リプレイオペレータは動作しません。私は潜在的な解決策が.cache()かもしれないことを知っていますが、なぜこの動作が引き起こされるのかわかりません。複数の利用者が閲覧可能なシングル

Retrofit retrofit = new Retrofit.Builder() 
      .baseUrl(API_URL) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build(); 

    // Create an instance of our GitHub API interface. 

    // Create a call instance for looking up Retrofit contributors. 

    Observable<List<Contributor>> testObservable = retrofit 
      .create(GitHub.class) 
      .contributors("square", "retrofit") 
      .share(); 


    Subscription subscription1 = testObservable 
    .subscribe(new Subscriber<List<Contributor>>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable throwable) { 

     } 

     @Override 
     public void onNext(List<Contributor> contributors) { 
      System.out.println(contributors); 
     } 
    }); 

    Subscription subscription2 = testObservable 
      .subscribe(new Subscriber<List<Contributor>>() { 
       @Override 
       public void onCompleted() { 

       } 

       @Override 
       public void onError(Throwable throwable) { 

       } 

       @Override 
       public void onNext(List<Contributor> contributors) { 
        System.out.println(contributors + " -> 2"); 
       } 
      }); 

    subscription1.unsubscribe(); 
    subscription2.unsubscribe(); 

上記のコードは、前述の動作を再現できます。それをデバッグして、受け取ったListが別のMemoryAddressに属していることを確認できます。

私は潜在的な解決策としてConnectable Observablesも見てきましたが、元の観測可能性を持ち、新しいサブスクライバを追加するたびに.connect()を呼び出す必要があります。

この種の動作は.share()であり、Retrofit 1.9までは正常に動作していました。それはRetrofit 2 - betaでの作業を停止しました。私は数時間前にリリースされたRetrofit 2 Release Versionでまだテストしていません。

EDIT:将来の読者のために2017年1月2日

、私はケースの詳細を説明する記事hereを書かれています!

答えて

23

ConnectedObservableさんが.share()さんから返された投稿を通常のObservableに(暗黙のうちに)キャストしているようです。あなたは温かいものと寒いものとの違いを読みたいかもしれません。

ConnectedObservable<List<Contributor>> testObservable = retrofit 
     .create(GitHub.class) 
     .contributors("square", "retrofit") 
     .share(); 

Subscription subscription1 = testObservable 
    .subscribe(new Subscriber<List<Contributor>>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable throwable) { 

    } 

    @Override 
    public void onNext(List<Contributor> contributors) { 
     System.out.println(contributors); 
    } 
}); 

Subscription subscription2 = testObservable 
     .subscribe(new Subscriber<List<Contributor>>() { 
      @Override 
      public void onCompleted() { 

      } 

      @Override 
      public void onError(Throwable throwable) { 

      } 

      @Override 
      public void onNext(List<Contributor> contributors) { 
       System.out.println(contributors + " -> 2"); 
      } 
     }); 

testObservable.connect(); 
subscription1.unsubscribe(); 
subscription2.unsubscribe(); 

編集を試してみてください:あなたはあなただけそれが観測を起動する必要があり、新しいサブスクリプションをするたびconnect()を呼び出す必要はありません。私はあなたがそれ以降のすべての加入者は、すべての項目は、私がここで何が起こっていたかの十分な説明を提案したいと思いRxJava開発者のDavid Karnokに戻って確認した後

ConnectedObservable<List<Contributor>> testObservable = retrofit 
     .create(GitHub.class) 
     .contributors("square", "retrofit") 
     .share() 
     .replay() 
+0

ありがとうございました。問題は、毎回connectを呼び出すことを避けたいということです。リプレイ演算子がこのユースケースで正常に動作することは確かですか? – Pavlos

+0

実際に私はそれをテストし、それは働いた。お時間をいただきありがとうございます。この問題のために、私は暑い気候と寒い気候の違いを読んだが、Retrofitのネットワークコールではそれを再現できませんでした。 Observable.just()を使用した場合、共有演算子はかなりうまく動作していました。 – Pavlos

30

を生産し得ることを確認するためにreplay()を使用することができたとします。

share()は、publish().refCount()と定義されます。 e。ソースObservableは、publish()によって最初にConnectableObservableに変換されますが、connect()を "手動で"呼び出すのではなく、refCount()によって処理されます。特に、refCountは、ConnectableObservable上でconnect()を呼び出し、それ自体が最初のサブスクリプションを受け取ると、少なくとも1人の加入者がいる限り、加入したままになります。最後に、加入者の数が0に減少すると、加入者は上向きに加入解除されます。 寒いObservablesは、Retrofitによって返されたものと同様に、実行中の計算をすべて停止します。

これらのサイクルの1つの後に別のサブスクライバが来る場合、refCountは再びconnectを呼び出し、ソースObservableへの新しいサブスクリプションをトリガします。この場合、別のネットワーク要求がトリガーされます。

これらの古いバージョンのRetrofitでは、デフォルトですべてのネットワーク要求が別のスレッドに移動されたため、これはRetrofit 1(および実際にはthis commitより前のバージョン)では明らかになりませんでした。これは、通常、最初のリクエスト/ Observableがまだ実行されていたためにsubscribe()コールがすべて発生することを意味し、したがって新しいSubscriberrefCountに追加され、追加リクエスト/ Observablesをトリガしません。

しかし、新しいバージョンのRetrofitでは、デフォルトでは別のスレッドに作業を移動することはありません。たとえば、subscribeOn(Schedulers.io())などを呼び出すことで明示的に行う必要があります。そうしないと、すべてがちょうど最初ObservableonCompletedと呼ばれ、したがって、すべてのSubscribers後に解除を持っており、すべてがシャットダウンされた後、第2 subscribe()にのみ発生しますことを意味し、現在のスレッドに滞在します。さて、第1段落で見たように、第2のsubscribe()が呼び出されたとき、share()は、別のネットワーク要求を引き起こすために別のSubscriptionを送信元Observableに引き起こし、別のネットワーク要求を引き起こします。

Retrofit 1の動作に戻るには、subscribeOn(Schedulers.io())を追加してください。

この結果、ほとんどの場合、ネットワーク要求のみが実行されます。しかし、基本的には、ネットワークリクエストが非常に速い場合や、subscribe()コールがかなり遅れて発生した場合にのみ、複数のリクエストを受け取ることができます(また、Retrofit 1を使用することもできます)。 2番目のsubscribe()が発生すると終了します。

したがって、Dávidは、cache()(ただし、あなたが言及した欠点を持っています)またはreplay().autoConnect()のいずれかを使用することを提案しています。加入者が失われたとき、それは を切断しないことを除いて、これらのrelease notesによると、autoConnectrefCountの前半だけのように動作し、より正確に、それは、)参照カウント(と行動の

似ています。

これは最初subscribe()が発生したときに要求にのみトリガーされる手段が、その後のすべての後Subscriber Sに関わらず0加入者との間における任意の時点で、あったかどうか、全て放出されたアイテムを受け取ることになります。

+1

あなたの詳細な説明をいただきありがとうございます。私はこの問題のためのいくつかの快適な解決策を持っていると確信していますが、確かにそれを心に留めてください:) – Pavlos

+2

素晴らしい!私は '再生 '、'共有 '、'公開 'などが十分に複雑であり、そこでの辺の事例の詳細な説明を傷つけないので、この説明を追加したかっただけです。 –

+2

本当に素晴らしい答え:) –

関連する問題