2016-02-16 14 views
7

は私がRxJava観測APIを使用して、次のコードを持っています:RxJava観察者コードの並列実行

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath()); 
    observable 
     .buffer(10000) 
     .observeOn(Schedulers.computation()) 
     .subscribe(recordInfo -> { 
     _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId()); 
      for(Info info : recordInfo) { 
      // some I/O operation logic 
     } 
     }, 
     exception -> { 
     }, 
    () -> { 
     }); 

私の期待は、サブスクライブ()メソッド内の観察コード、すなわちコードはIの後に並列に実行されるということでした計算スケジューラを指定しました。その代わりに、コードは依然として単一のスレッドで順次実行されています。 RxJava APIを使用してコードを並列実行する方法

答えて

26

RxJavaは、非同期/マルチスレッドの側面で誤解されることがよくあります。マルチスレッド操作のコーディングは簡単ですが、抽象を理解することも別のことです。

RxJavaに関するよくある質問は、並列化を達成する方法、またはObservableから複数の項目を同時に放出することです。もちろん、この定義はObservable Contractを破ります.Onservable Contractは、onNext()を連続して呼び出さなければならず、一度に複数のスレッドによって同時に呼び出されなければならないと述べています。

並列処理を実現するには、複数のObservablesが必要です。

これは、シングルスレッドで動作します:

これは、複数のスレッドで実行さ
Observable<Integer> vals = Observable.range(1,10); 

vals.subscribeOn(Schedulers.computation()) 
      .map(i -> intenseCalculation(i)) 
      .subscribe(val -> System.out.println("Subscriber received " 
        + val + " on " 
        + Thread.currentThread().getName())); 

:あなたは、その目的のためにsubscribeOn(Schedulers.computation())の代わりobserveOn(Schedulers.computation())を指定する必要が

Observable<Integer> vals = Observable.range(1,10); 

vals.flatMap(val -> Observable.just(val) 
      .subscribeOn(Schedulers.computation()) 
      .map(i -> intenseCalculation(i)) 
).subscribe(val -> System.out.println(val)); 

コードとテキストcomes from this blog post.

+1

rxjavaでのマルチスレッドコーディングに最適です。 –

2

subscribeOnでは、値を発行するスレッドを宣言します。 observeOnでは、どのスレッドで処理し、監視するかを宣言します。

0

これは同じシーケンスです。新しいスレッドでさえも

Observable ob3 = Observable.range(1,5);

ob3.flatMap(new Func1<Integer, Observable<Integer>>() { 

     @Override 
     public Observable<Integer> call(Integer pArg0) { 

      return Observable.just(pArg0); 
     } 

    }).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() { 

     @Override 
     public Integer call(Integer pArg0) { 

      try { 
       Thread.sleep(1000 - (pArg0 * 100)); 
       System.out.println(pArg0 + " ccc " + Thread.currentThread().getName()); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 

      return pArg0; 
     } 

    }).subscribe(); 

出力 1 CCCのRxNewThreadScheduler-1 2 CCCのRxNewThreadScheduler-1 3 CCCのRxNewThreadScheduler-1 4 CCCのRxNewThreadScheduler-1 5 CCCのRxNewThreadScheduler-1

0

flatMapを使用し、Schedulers.computation()にサブスクライブするために指定並行性を実現します。

Callableを使用したより実用的な例です。出力から、すべてのタスクを終了するのに約2000ミリ秒かかることがわかります。

static class MyCallable implements Callable<Integer> { 

    private static final Object CALLABLE_COUNT_LOCK = new Object(); 
    private static int callableCount; 

    @Override 
    public Integer call() throws Exception { 
     Thread.sleep(2000); 
     synchronized (CALLABLE_COUNT_LOCK) { 
      return callableCount++; 
     } 
    } 

    public static int getCallableCount() { 
     synchronized (CALLABLE_COUNT_LOCK) { 
      return callableCount; 
     } 
    } 
} 

private static void runMyCallableConcurrentlyWithRxJava() { 
    long startTimeMillis = System.currentTimeMillis(); 

    final Semaphore semaphore = new Semaphore(1); 
    try { 
     semaphore.acquire(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable()) 
      .flatMap(new Function<MyCallable, ObservableSource<?>>() { 
       @Override 
       public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception { 
        return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation()); 
       } 
      }) 
      .subscribeOn(Schedulers.computation()) 
      .subscribe(new Observer<Object>() { 
       @Override 
       public void onSubscribe(@NonNull Disposable d) { 

       } 

       @Override 
       public void onNext(@NonNull Object o) { 
        System.out.println("onNext " + o); 
       } 

       @Override 
       public void onError(@NonNull Throwable e) { 

       } 

       @Override 
       public void onComplete() { 
        if (MyCallable.getCallableCount() >= 4) { 
         semaphore.release(); 
        } 
       } 
      }); 


    try { 
     semaphore.acquire(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } finally { 
     semaphore.release(); 
    } 
    System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis)); 
} 
関連する問題