7

プロデューサとコンシューマのパターンを理解することは、プロデューサとコンシューマの間で共有されるキューを使用して実装できることです。プロデューサは共有キューに作業を送信し、コンシューマはキューを取得して処理します。また、プロデューサが直接コンシューマに提出することで実装することもできます(コンシューマのエグゼキュータサービスに直接提出するプロデューサスレッド)。プロデューサコンシューマ - Executors.newFixedThreadPoolを使用

ここでは、スレッドプールの一般的な実装を提供するExecutorsクラスを見てきました。 newFixedThreadPoolメソッドは、仕様に従って、「共有された無制限キューから動作する固定数のスレッドを再利用する」。彼らはここでどのキューを話しているのですか?

プロデューサがコンシューマにタスクを直接送信する場合は、実行可能ファイルのリストを含むExecutorServiceの内部キューですか?

プロデューサが共有キューに送信する場合は、中間キューですか?

私は全体のポイントを逃しているかもしれませんが、誰かが明確にしてくださいか?

答えて

4

あなたは正しく、ExecutorServiceはスレッドプールだけでなく、Producer-Consumerの完全実装です。この内部キューは実際にというタスクを保持するRunnable(正確にはFutureTask)のスレッドセーフキューです。

プール内のすべてのスレッドは、そのキューでブロックされ、タスクの実行を待機します。あなたがsubmit()のタスクを実行すると、ちょうど1つのスレッドがそれを取り出して実行します。もちろんsubmit()はプール内のスレッドが処理を終了するのを待っていません。

一方、膨大な数のタスク(または長時間実行するタスク)を提出すると、プール内のすべてのスレッドが占有され、キュー内でいくつかのタスクが待機してしまう可能性があります。タスクで完了したスレッドは、直ちにキューから最初のスレッドを選択します。

+0

をただ明確にする: 'ExecutorService'は単なるインターフェイスです。あなたは* ExecutorService'を、同じスレッド内で実行されるとすぐに各実行可能ファイルを実行するクラスで実装することができます(そして、それだけで 'java.util.concurrent'パッケージに実装されていると思います) 。実際には、ほとんどのExecutorService実装は完全なプロデューサ - コンシューマ実装です。 –

+0

あなたは絶対に正しいです、 'ExecutorService'は' Executor.newFixedThreadPool() 'が返すもので、' ExecutorService' *を実装しています。明確化のためにありがとう。 –

+1

ありがとうございます。したがって、newFixedThreadPool(8)を使用してエグゼキュータサービスを作成し、その上で約1000の実行可能タスクを実行する場合は、シナリオの理解を確認してください。 1.最大で8つのスレッドが作成されます 2.処理の開始時8つのスレッドがビジーである間に、992のタスクが内部キュー に保持されます。また、その無制限キューのため、実行可能サービスに送信できるタスクの数に上限はありません。 バインドされたキューでExecutorServiceを作成すると、上記のシナリオにどのような影響がありますか?それはより良い実行するか? ありがとうございます、O. – Oxford

0

チェックこのアウト:
Producer-Consumer example in Java (RabbitMQ)(それは別のライブラリのために書かれていますが、それはJavaでだと、それは明確な概念を示し;)
はそれが役に立てば幸い!

P.S.:Actually、それはいくつかの例がありますが、あなたのアイデアを得る;)

1
public class Producer extends Thread { 
    static List<String> list = new ArrayList<String>(); 

    public static void main(String[] args) { 
     ScheduledExecutorService executor = Executors 
       .newScheduledThreadPool(12); 
     int initialDelay = 5; 
     int pollingFrequency = 5; 
     Producer producer = new Producer(); 
     @SuppressWarnings({ "rawtypes", "unused" }) 
     ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay( 
       producer, initialDelay, pollingFrequency, TimeUnit.SECONDS); 
     for (int i = 0; i < 3; i++) { 
      Consumer consumer = new Consumer(); 
      @SuppressWarnings({ "rawtypes", "unused" }) 
      ScheduledFuture schedFutureConsumer = executor 
        .scheduleWithFixedDelay(consumer, initialDelay, 
          pollingFrequency, TimeUnit.SECONDS); 
     } 

    } 

    @Override 
    public void run() { 
     list.add("object added to list is " + System.currentTimeMillis()); 
           ///adding in list become slow also because of synchronized behavior 
    } 
} 

class Consumer extends Thread { 

    @Override 
    public void run() { 
     getObjectFromList(); 
    } 

    private void getObjectFromList() { 
     synchronized (Producer.list) { 
      if (Producer.list.size() > 0) { 
       System.out.println("Object name removed by " 
         + Thread.currentThread().getName() + "is " 
         + Producer.list.get(0)); 
       Producer.list.remove(Producer.list.get(0)); 
      } 
     } 
    } 
} 
関連する問題