2017-02-10 3 views
0

Observable.zipを使用して2つのリクエストの並列処理にrxjavaを使用しています。私がやろうとしているのは、1つで、observable say response私は1つの応答を得ています。observable say diff私は応答を得て、この違いをDBに保存しようとしています。問題は、私は差分を挿入しています...私はresponse observableここrxjava - 応答の取得とdiffの並列挿入

は私がやっているものです応答を取得する場合diff observableが完了取得されていないとして、私の要件を達成する方法がわからないです

public ServiceResponse getDummyResponse(ServiceRequest serviceRequest, String prodId){ 
    Observable<ServiceResponse> subInfoDummyObservable = getDummyResonseGenericObservable(); 
    Observable<ServicesDiff> reObservable = getServicesDiffGenericObservable(serviceRequest, prodId); 

    Observable<ServiceResponse> responseObservable = Observable.zip(
      subInfoDummyObservable, 
      reObservable, 
      new Func2<ServiceResponse, ServicesDiff, ServiceResponse>() { 
       @Override 
       public ServiceResponse call(ServiceResponse serviceResponse, ServicesDiff diffResponse) { 
        return serviceResponse; 
       } 
      } 
    ); 

    ServiceResponse serviceResponse = responseObservable.toBlocking().single(); 

    return serviceResponse; 
} 

Observable<ServiceResponse> getDummyResonseGenericObservable() { 
    return GenericHystrixCommand.toObservable("getDummyResonseGenericObservable", "getDummyResonseGenericObservable",() -> new ServiceResponse(),(t) -> {return null;}); 
} 

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) { 
    return GenericHystrixCommand.toObservable("getServicesDiffGenericObservable", "getServicesDiffGenericObservable",() -> getBothServiceResponses(serviceRequest, prodId),(t) -> {return null;}); 
} 

public ServicesDiff getBothServiceResponses(ServiceRequest serviceRequest, String prodId) { 
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId); 
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId); 

    Observable<ServicesDiff> observable = Observable.zip(
      service1ResponseObservable, service2ResponseObservable, 
      new Func2<String, ServiceResponse, ServicesDiff>() { 
       @Override 
       public ServicesDiff call(String service1Response, ServiceResponse service2Response) { 
        return aggregate(service1Response, service2Response); // never reaches this point********** 
       } 
      } 
    ); 
    ServicesDiff response = observable.toBlocking().single(); 

    return response; 
} 

ですDBはaggregateメソッドですが、aggregateには決して達しません。ここで間違っていることを教えてください。ありがとう。

+0

あなたのコードサンプルが何であるか、私には明らかではないが、問題のあるgetBothServiceResponsesメソッドと他のコードとの関係 getBothServiceResponsesでジップしている2つの観測値は何ですか? – yosriz

+0

私は同意する、コードは明確化する必要があります。最初の3つのメソッドは決して呼び出されないので、問題のある第4のメソッド 'getBothServiceResponses()'では、observablesがどのように見えるか分かりません – nosyjoe

答えて

0

Observableは、データの消費方法の説明です。コードサンプルでは、​​購読しないで、実際にデータを消費しません。あなたは要求方法を説明しましたが、要求をトリガする部分である購読部分はありません。

私は少しあなたのコード書き換えるのであれば:

class Aggregate { 
    Aggregate(String reponse, ServicesDiff diff) { 
     ... 
    } 
} 

Observable<String> getService1GenericObservable(String prodId) { 
    ... 
} 

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) { 
    ... 
} 

public Observable<Aggregate> getBothServiceResponses(ServiceRequest serviceRequest, String prodId) { 
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId); 
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId); 

    return Observable<Aggregate> observable = Observable.zip(
      service1ResponseObservable, service2ResponseObservable, 
      new Func2<String, ServiceResponse, ServicesDiff>() { 
       @Override 
       public ServicesDiff call(String service1Response, ServiceResponse service2Response) { 
        return aggregate(service1Response, service2Response); 
       } 
      } 
    ); 
} 

をあなただけの結果の集計にアクセスするには、この操作を行う必要があります:

getBothServiceResponses(serviceRequest, prodId).subscribe(...) 
関連する問題