2017-03-29 18 views
1

私はzipオペレーションで問題に直面しています。私はZipオペレータと組み合わせているObservablesを3つ持っています。問題は、サブスクライブコードの文が実行されないことがあることがあります。 zip演算子は、すべてのobservableがイベントを送出するのを待つことは想定されていません。以下はサンプルコードです。ZipオペレータでRxJavaの問題

System.out.println("Main Method ********** " + Thread.currentThread().getName()); 
    sleep(); 

プリント:

import java.util.Date; 

import rx.Observable; 
import rx.schedulers.Schedulers; 
public class ZipRxJava { 

    public static void main(String[] args) { 
     ZipRxJava z = new ZipRxJava(); 
     Observable<CartPlanResponse> o1 = Observable.<CartPlanResponse>create(sub -> sub.onNext(createPlanResponse(z))).subscribeOn(Schedulers.io()); 
     Observable<CartFeatureResponse> o2 = Observable.<CartFeatureResponse>create(sub -> sub.onNext(createFeatureResponse(z))).subscribeOn(Schedulers.io()); 
     Observable<CartAccessoriesResponse> o3 = Observable.<CartAccessoriesResponse>create(sub -> sub.onNext(createAccessoriesResponse(z))).subscribeOn(Schedulers.io()); 
     Observable.zip(o1, o2, o3, (p1, p2, p3) -> { 
      System.out.println("Inside Transformer $$$$$$$$$$$››››" + Thread.currentThread().getName()); 
      Response res = z.new Response(); 
      res.setPlanResponse(p1); 
      res.setFeatureResponse(p2); 
      res.setAccesoriesResponse(p3); 
      return res; 
     }).subscribe(r1 -> System.out.println("&&&&&&&&&&&"+ Thread.currentThread().getName() + "*******" + r1.getPlanResponse().getPlanId() + " " + r1.getFeatureResponse().getFeatureId() + " " + 
      r1.getAccesoriesResponse().getAccessoryId()), e1 -> System.out.println("Error")); 
     System.out.println("Main Method ********** " + Thread.currentThread().getName()); 
    } 


    private static CartPlanResponse createPlanResponse(ZipRxJava z) { 
     System.out.println("Plan ********** " + Thread.currentThread().getName()); 
     CartPlanResponse res = z.new CartPlanResponse(); 
     res.setPlanId("123"); 
     System.out.println("Before Return Plan ********** " + Thread.currentThread().getName()); 
     return res; 
    } 

    private static CartFeatureResponse createFeatureResponse(ZipRxJava z) { 
     System.out.println("Feature ********** " + Thread.currentThread().getName()); 
     //sleep(); 
     int y =0; 
     for (int i =0 ; i <100000000; i++) { 
      y +=i; 
     } 
     CartFeatureResponse res = z.new CartFeatureResponse(); 
     res.setFeatureId("345"); 
     System.out.println("Before Return Feature ********** " + Thread.currentThread().getName()); 
     return res; 
    } 

    private static CartAccessoriesResponse createAccessoriesResponse(ZipRxJava z) { 
     System.out.println("Accessories ********** " + Thread.currentThread().getName()); 
     CartAccessoriesResponse res = z.new CartAccessoriesResponse(); 
     res.setAccessoryId("567"); 
     System.out.println("Before Return Accessories ********** " + Thread.currentThread().getName()); 
     return res; 
    } 

    private static void sleep() { 
     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    private class CartPlanResponse { 
     String planId; 

     public String getPlanId() { 
      return planId; 
     } 

     public void setPlanId(String planId) { 
      this.planId = planId; 
     } 
    } 

    private class CartFeatureResponse { 
     private String featureId; 

     public String getFeatureId() { 
      return featureId; 
     } 

     public void setFeatureId(String featureId) { 
      this.featureId = featureId; 
     } 
    } 

    private class CartAccessoriesResponse { 
     private String accessoryId; 

     public String getAccessoryId() { 
      return accessoryId; 
     } 

     public void setAccessoryId(String accessoryId) { 
      this.accessoryId = accessoryId; 
     } 
    } 

    private class Response { 
     private CartPlanResponse planResponse; 
     private CartFeatureResponse featureResponse; 
     private CartAccessoriesResponse accesoriesResponse; 
     public CartPlanResponse getPlanResponse() { 
      return planResponse; 
     } 
     public void setPlanResponse(CartPlanResponse planResponse) { 
      this.planResponse = planResponse; 
     } 
     public CartFeatureResponse getFeatureResponse() { 
      return featureResponse; 
     } 
     public void setFeatureResponse(CartFeatureResponse featureResponse) { 
      this.featureResponse = featureResponse; 
     } 
     public CartAccessoriesResponse getAccesoriesResponse() { 
      return accesoriesResponse; 
     } 
     public void setAccesoriesResponse(CartAccessoriesResponse accesoriesResponse) { 
      this.accesoriesResponse = accesoriesResponse; 
     } 

    } 

答えて

0

あなたはRxJavaスケジューラが停止し、メインスレッドがあなたのケースで終了した場合に作業を実行していないデーモンスレッド上で実行されるため、メインの方法でスリープ状態に持って

Plan ********** RxIoScheduler-2 
Before Return Plan ********** RxIoScheduler-2 
Feature ********** RxIoScheduler-3 
Main Method ********** main 
Accessories ********** RxIoScheduler-4 
Before Return Accessories ********** RxIoScheduler-4 
Before Return Feature ********** RxIoScheduler-3 
Inside Transformer $$$$$$$$$$$››››RxIoScheduler-3 
&&&&&&&&&&&RxIoScheduler-3*******123 345 567 
+0

私はWebアプリケーションから実行しているため、スリープメソッドを使用できません。 3つのObservableコードがすべて他のマイクロサービスを呼び出しています。Observable 2のコードにデバッガを置くとサービスが応答を返しますが、Observable 2のサービスが利用できないエラー応答が返されます。デバッガでは、zipオペレーションはObservable 2がメッセージを出すのを待っていますが、デバッガなしではzipオペレーションはObservable 2がエラーであると仮定してエラーを返しています。 – Yogi

+0

この要件は問題ではありませんでしたか?普通の 'subscribe()'の代わりに、 'toBlocking()。subscribe()'のようにブロックする必要があります。 – akarnokd

関連する問題