2016-11-19 4 views
2

基本的には、一度に乱数を生成するプロデューサーと、1秒間スリープして数値を出力する複数の消費者があります。rxjavaでバランスファンアウトを実装する方法は?

すべての消費者は排他的ですが、すべての人が1つの受信者しか持てません。
この動作は、javaのJMSキューまたはBlockingQueueに似ています。アッカストリームで

、私は

balance[T] – (1 input, N outputs) given an input element emits to one of its output ports. 

を見つけることができますしかし、私はrxjava内の任意の組み込みコンポーネントを見つけることができない同じ仕事をします。
観測者は常にpub-subのようなオブザーバーにメッセージをブロードキャストします。 queueスタイルが必要な場合はどうすればいいですか?

私は何かお見逃しですか?

+0

あなたのユースケースは何ですか?要素の処理を並列化しますか? – akarnokd

+0

はい、要素の処理を並列化したいと思います。私は、複数のスレッドとブロックキューでこれを行うことができますが、私はこれをよりエレガントにするのだろうかと思います。 プロデューサはデータベースからフェッチしたイベントと、これらのイベントで時間のかかる仕事をしている多数の消費者を放出しています。これらの消費者はまったく同じです。 –

答えて

1

あなたが持っている精神モデルは、Rxが構築しているものと実際には一致しないと思います。大規模なコンポーネント間のメッセージではなく、多くの小さな操作のストリームを考えると思います。

I a)はキャップしたスレッドプールBをお勧めしたい)が、次にCを中心RXスケジューラ:

databaseSource 
.fetchItems() 
.flatMap(item -> 
    Obsevable.just(item) 
    .observeOn(cappedThreadScheduler) 
    .map(item -> longRunningOperation(item)) 
) 

OTOH、あなたも、このようにそれを行うことができます。

databaseSource 
.fetchItems() 
.flatMap(item -> 
    Obsevable.just(item) 
    .observeOn(schedulers.io()) 
    .map(item -> longRunningOperation(item)) 
    , 16 
) 

へ並行して実行される操作は最大16個です。

+0

こんにちは、私はhttp://stackoverflow.com/questions/40701696/is-it-ok-to-transform-following-code-in-rxjavaで自分のコードを更新します。実行可能なコードで助けてくれますか? –

関連する問題