2017-09-03 2 views
2

に加入し、スレッドプールにタスクを実行する簡単な例:考えるrxjava2 - 私はRxJavaまわりで私の頭を取得するには、次のタスクを試しています単一のスレッド

  • URLのリストを
  • ドゥそれぞれの結果については、スレッドプール
  • 上の各URLに対してHTTPリクエスト(ここではなしマルチスレッド)のSQLiteデータベースにいくつかのデータを挿入
  • ブロックの方法、それは
を完了するまで

は、だから私はKotlinでそれを試してみた:

val ex = Executors.newFixedThreadPool(10) 
Observable.fromIterable((1..100).toList()) 
    .observeOn(Schedulers.from(ex)) 
    .map { Thread.currentThread().name } 
    .subscribe { println(it + " " + Thread.currentThread().name } 

私はそれが

pool-1-thread-1 main 
pool-1-thread-2 main 
pool-1-thread-3 main 
pool-1-thread-4 main 
.... 

を印刷することが期待しかし、それは印刷します。

pool-1-thread-1 pool-1-thread-1 
pool-1-thread-1 pool-1-thread-1 
pool-1-thread-1 pool-1-thread-1 

誰もこれがどのように機能するかについての私の誤解を訂正することはできますか?スレッドプールのすべてのスレッドを使用しないのはなぜですか?完了するまでメインのスレッドまたはブロックでサブスクライバを実行させるにはどうすればよいですか?

答えて

3

Rxはパラレル実行サービスではないため、JavaのストリームAPIを使用します。 Rxイベントは同期しており、その後ストリームを流れます。ストリームを構築するとき、observeOnはスレッドを1回要求し、そのスレッドで1つずつエミッションを処理します。

また、subscribeがメインスレッドで実行されると予想されました。 observeOnはスレッドを切り替え、すべてのダウンストリームイベントはそのスレッドで発生します。メインスレッドに切り替える場合は、別のobserveOnsubscribeの直前に挿入する必要があります。あなたは自分のスケジューラで観測可能にそれをラップする必要があり、並列でのごmapブロック作品内のコードを作るために

1

:この場合

val ex = Executors.newFixedThreadPool(10) 
    val scheduler = Schedulers.from(ex) 
    Observable.fromIterable((1..100).toList()) 
      .flatMap { 
       Observable 
         .fromCallable { Thread.currentThread().name } 
         .subscribeOn(scheduler) 
      } 
      .subscribe { println(it + " " + Thread.currentThread().name) } 

、あなたは結果が表示されます。

pool-1-thread-1 pool-1-thread-1 
pool-1-thread-2 pool-1-thread-1 
pool-1-thread-3 pool-1-thread-1 
pool-1-thread-4 pool-1-thread-1 
... 

あなたはすることができますこの動作の説明を示すRxJava - Achieving Parallelizationのチェック文書を参照してください。

また、RxJava 2.0.5は、ParallelFlowable API

関連する問題