2012-07-04 15 views
7

同じInputStreamを同時に処理するN個のコンシューマスレッドを生成する必要があります。たとえば、何らかの形で変換し、チェックサムやデジタル署名などを計算します。これらのコンシューマは、データソースとしてInputStreamを受け入れる第三者のライブラリを使用しています。独立したコンシューマでの単一のInputStreamの同時処理

は、だから私は何ができるかである - 「親」ストリームからのデータ

  • ブロック解除消費者の

    • 読み取りチャンクは、すべての消費者まで
    • 待ち時間が全体のチャンクを読み込みますInputStreamのいくつかの実装を作成します
    • 次のチャンクを読ん

    シンプル見ながら、それはliveloのような様々な問題が上昇する可能性どのコンシューマが死んでも、すべてのInputStreamメソッドを実装し、バリア/ラッチなどを使用してコンシューマ自身のフォーク/ジョインを制御します。

    1人のバディーは実装するのが半時間だと私に言いました。

    グーグルでは成熟したものを使うのが好きです(グーグルでは結果が得られませんでした。私のgoogle-fuは十分ではありません)か、 "ソース"ストリーム全体を一時ファイルにコピーしないでくださいこれをデータのソースとして使用します。後者の解決法はより信頼性が高いようですが、ギガバイトファイルの作成に終わることがあります(たとえばストリーミングオーディオを処理する場合)。

  • +0

    データをファイルに書き込んで、N個のFileInputStreamを生成できますか? –

    +0

    @JonLin彼は質問の終わりに向かって言ったように、彼はすることができます。 –

    答えて

    3

    私が見ているように、バッファリングの種類は少なくともある程度はありますので、さまざまな消費者が現在最も遅い消費者に常にぎこちなく動かされることなく、さまざまなペースでストリームを移動できます。基本的に、最悪の場合のパフォーマンスと並行性の恩恵はほとんどありません。

    これまで使用していた消費者を各チャンクにタグ付けし、完全に使い切った消費者を削除することができます。たぶんこれは、まだ使用されていない各チャンクへの参照を保持する各消費者によって達成される可能性があります。これによりGCは自動的に使用済みのチャンクを処理します。プロデューサはWeakReferenceのリストをチャンクに保持しているので、まだ使用されていないチャンクの数に応じたハンドルを持ち、そのスロットルに基づいて調整することができます。

    私はまた、内部的にプロデューサInputStreamと通信するスレッドごとに別のInputStreamインスタンスを持つことを考えています。こうすることで、ライブロックハザードの簡単な解決策が得られます。try ... finally { is.close(); } - 死んでいる消費者は、独自の入力ストリームを閉じます。これはプロデューサに伝えられます。

    消費者1人あたりArrayBlockingQueueを使用していくつかのアイデアがあります。プロデューサーをブロックしたりビジーにしたりせずに、すべての消費者が適切に供給されるようにすることには、ある程度の困難があります。

    +0

    私はそれがほんのわずかな利点であるとは言いません。5人の消費者が1秒間働いていて、1人の消費者が2秒間働いていると、同時呼び出しで2秒、7秒で7秒です。または私はここに何かを逃していますか?チャンクとバッファにタグが付いているので、私は避けたいメモリ消費量に達するでしょう。 – jdevelop

    +0

    はい、あなたの言うことは避けられません。しかし、消費者が平均的にバランスがとれているが、そのパフォーマンスが大きく変化している場合は、現在遅れている消費者を常に待っていると、同意の機会が失われます。バッファリングが助けになるでしょう。スレッドの優先順位の調整を導入すると、実際にこのような状況になる可能性があります。 –

    0

    パイプストリームの使用を検討しましたか?あなたのプロデューサには、ファイルから読み込んだものを何でも投げる1つ以上のPipedOuputStreamがあります。パイプの反対側には、対応するPipedInputstream(あなたのライブラリと共有できるInputStream)で異なる消費者スレッドがあります。

    プロデューサスレッドは、パイプのどちら側からデータを送信するかを決めることができます。これにより、パイプの反対側で読み取るコンシューマスレッドに対して処理されるデータが提供されます。

    コンシューマスレッドからデータを戻す必要がある場合は、逆方向に別のパイプを作成してデータを送り返すことができます。

    +1

    'PipedOutputStream'は、コンシューマーが遅れてすぐにプロデューサーをブロックし、他のすべてのコンシューマーを飢えさせます。 –

    0

    Apache ActiveMQのようなJava Messaging Service(JMS)実装を試すことができます。

    あなたの場合、というトピックを作成する必要があります。Topics vs. Queuesを参照)。トピックはプロデューサによって作成され、N個のコンシューマにパブリッシュされます。コンシューマは同時に実行され、各コンシューマはまったく同じデータを受信します。

    InputStreamを使用したいので、send messages are streamsの章があります。

    通常、プロデューサとコンシューマは別々のプロセスであり、おそらくネットワーク上の異なるマシン上で動作しているとします。私はそれを単一のJVMで完全に実行するように設定することができると思います。これはJMSの実装に依存します。これらはまたかなり有名である:HornetQ by JBossRabbitMQ、および他の多くの束。

    関連する問題