2012-04-19 15 views
1

スレッド同期をJavaでかなり読んだことがあります。私は現在、Bounded-Buffer問題を試しています。プロデューサは、消費者が消費し続ける間、バッファ上にプロダクトを生成し続けます。私はスレッド同期をまったく取得しません

プロデューサは、バッファがいっぱいになってから別の製品を生産するまで待機します。 バッファが空の場合、コンシューマは待機します。

しかし、私の問題は、バッファがいっぱいになるまで、バッファが空のときだけプロデューサが生成を開始することです。消費者は、バッファが空になるまでバッファがいっぱいになったときに消費を開始します。

例(バッファサイズ:5)

Produced Product 1 
Produced Product 2 
Produced Product 3 
Produced Product 4 
Produced Product 5 
Consumed Product 1 
Consumed Product 2 
Consumed Product 3 
Consumed Product 4 
Consumed Product 5 
Produced Product 6 
Produced Product 7 
Produced Product 8 
Produced Product 9 
Produced Product 10 
Consumed Product 6 
Consumed Product 7 
Consumed Product 8 
Consumed Product 9 
Consumed Product 10 
Produced Product 11 
Produced Product 12 
Produced Product 13 
Produced Product 14 
Produced Product 15 
Consumed Product 11 
Consumed Product 12 
Consumed Product 13 
Consumed Product 14 
Consumed Product 15 
Produced Product 16 
Produced Product 17 
Produced Product 18 
Produced Product 19 
Produced Product 20 
Consumed Product 16 
Consumed Product 17 
Consumed Product 18 
Consumed Product 19 
Consumed Product 20 
Produced Product 21 
Produced Product 22 
Produced Product 23 
Produced Product 24 
Produced Product 25 
Consumed Product 21 
Consumed Product 22 
Consumed Product 23 
Consumed Product 24 
Consumed Product 25 
Produced Product 26 
Produced Product 27 
Produced Product 28 
Produced Product 29 
Produced Product 30 
Consumed Product 26 
Consumed Product 27 
Consumed Product 28 
Consumed Product 29 
Consumed Product 30 

プロデューサがあれば、バッファは、バッファが空であるか否か、完全ではないとして生成するように、私はそれをしたいです。その後、バッファが空でないかぎり、消費者がバッファをいっぱいにするかどうかにかかわらず消費するようにします。

私のコードで何が問題になっていますか?

import java.util.LinkedList; 
import java.util.Queue; 
import java.util.Random; 
import javax.swing.JOptionPane; 

public class ProducerConsumer{ 

    final static Queue<Product> buffer = new LinkedList<>(); 
    private static int buffer_size, no_items, itemno = 1; 
    private static Random r = new Random(); 
    public static void main(String[] args) { 
     try{ 
      buffer_size = Integer.parseInt(JOptionPane.showInputDialog("Input Buffer size (default: 5)")); 
      if(buffer_size<0){ 
       buffer_size = 5; 
      } 
     }catch(NumberFormatException nfe){ 
      buffer_size = 5; 
     } 
     try{ 
      no_items = Integer.parseInt(JOptionPane.showInputDialog("Input No. of Items (default: 10)")); 
      if(no_items<0){ 
       no_items = 10; 
      } 
     }catch(NumberFormatException nfe){ 
      no_items = 10; 
     } 
     Producer producer = new Producer(); 
     Consumer consumer = new Consumer(); 
     producer.start(); 
     consumer.start(); 
    } 

    static class Product { 
     private String name = "Product X"; 
     private int productno; 

     public Product(int productno) { 
      this.productno = productno; 
      this.name = "Product "+productno; 
     } 
     public int number() { 
      return productno; 
     } 
     @Override 
     public String toString() { 
      return name; 
     } 
    } 
    static class Producer extends Thread{ 
     public Producer(){ 
     } 
     public void produce(){ 
      Product p = new Product(itemno++); 
      try { 
       this.sleep(r.nextInt(100)); 
      } catch (InterruptedException ex) { 
       Thread.interrupted(); 
      } 
      buffer.add(p); 
      System.out.println("Produced "+p); 
     } 
     @Override 
     public void run(){ 
      synchronized(buffer){ 
       while(itemno<=no_items){ 
        while(buffer.size()==buffer_size){ 
         try{ 
          buffer.wait(100); 
         }catch(InterruptedException e){ 
          Thread.interrupted(); 
         } 
        } 
        produce(); 
        buffer.notifyAll(); 
       } 
      } 
     } 
    } 
    static class Consumer extends Thread{ 
     public Consumer(){ 
     } 
     public boolean consume(){ 
      try { 
       this.sleep(r.nextInt(100)); 
      } catch (InterruptedException ex) { 
       Thread.interrupted(); 
      } 
      Product product = buffer.remove(); 
      System.out.println("Consumed "+product); 
      return product.number()==no_items; 
     } 
     @Override 
     public void run(){ 
      synchronized(buffer){ 
       boolean end = false; 
       while(!end){ 
        while(buffer.isEmpty()){ 
         try{ 
          buffer.wait(100); 
         }catch(InterruptedException e){ 
          Thread.interrupted(); 
         } 
        } 
        end = consume(); 
        buffer.notifyAll(); 
       } 
      } 
     } 
    } 
} 
+1

また、サイドノートに。あなたは 'this.sleep(millis) 'ではなく' Thread.sleep(millis) 'を使っていなければなりません。 'sleep'は' Thread.currentThread() 'で常に実行される静的メソッドです。あなたがその方法でそれを使用しているのであなたのために働きますが、将来あなたを傷つけるかもしれません。 –

答えて

4

あなたはbufferモニターを取得してnotifyAllの前にN回を作成し、すべてのwhileループ内で、​​ブロックを終了する介してバッファモニタをリリースしているからです。

ブロック​​の外側にwhileを入れてください。他のスレッドには、同時に生成/消費中にロックを取得する機会があります。通知スレッドが​​地域に

編集を終了したまでnotifyAllが実際に他のスレッドに影響を与えないことを

注:私は、提案された変更を行った後、それを実行しましたし、私は次の出力

Produced Product 1 
Produced Product 2 
Produced Product 3 
Consumed Product 1 
Consumed Product 2 
Consumed Product 3 
Produced Product 4 
Produced Product 5 
Consumed Product 4 
Consumed Product 5 
Produced Product 6 
Consumed Product 6 
Produced Product 7 
Produced Product 8 
Produced Product 9 
Produced Product 10 
Consumed Product 7 
Consumed Product 8 
Consumed Product 9 
Consumed Product 10 
+1

実際、可能な限り小さなコードブロックに同期を制限しようとします。 (例えば、実際には 'sleep()'を呼び出すべきではありません。) – biziclop

+0

ありがとう!それが本当に私を助けました! –

+1

マイナーニックピック:モニターは 'wait()'を呼び出すことによっても解放されます。 –

0

以来です両方のスレッドがバッファ上で同期されている場合は、いずれかのスレッドのみがいつでも実行されます。 1つのスレッドがもう一方のスレッドにシグナルを送り、buffer.wait()を呼び出すと自身を中断します。 buffer.wait()呼び出し(buffer.size()== buffer_size)と(buffer.isEmpty())を含むwhileループのロジックは、ループが関数(プロデューサまたはコンシューマ)のときにのみ実行できるようにします。完了しており、続行できません。

プログラムは次のように実行します。プロデューサは5つのアイテムを生成し、whileループ条件は真となり、コンシューマが実行できるようにbuffer.wait()を呼び出します。コンシューマは、バッファが空になるまで実行して消費し始めます。この場合、while条件はtrueになり、buffer.wait()を呼び出してプロデューサを再び実行させます。

両方のスレッドを同時に実行できるようにするには、おそらく実際に実行していることでしょう。バッファへの安全なアクセスが必要な場合にのみ、synchronize(バッファ)を呼び出す必要があります。それを行う1つの方法は次のコードブロックです:

static class Producer extends Thread { 
    public Producer() { 
    } 

    public void produce() { 
     Product p = new Product(itemno++); 
     try { 
      this.sleep(r.nextInt(100)); 
     } catch (InterruptedException ex) { 
      Thread.interrupted(); 
     } 
     synchronized(buffer) { 
      buffer.add(p); 
     } 
     System.out.println("Produced " + p); 
    } 

    @Override 
    public void run() { 
     while (itemno <= no_items) { 
      while (buffer.size() == buffer_size) { 
       try { 
        sleep(100); 
       } catch (InterruptedException e) { 
        Thread.interrupted(); 
       } 
      } 

      produce(); 
     } 
    } 
} 

static class Consumer extends Thread { 
    public Consumer() { 
    } 

    public boolean consume() { 
     try { 
      this.sleep(r.nextInt(100)); 
     } catch (InterruptedException ex) { 
      Thread.interrupted(); 
     } 

     Product product; 
     synchronized (buffer) { 
      product = buffer.remove(); 
     } 

     System.out.println("Consumed " + product); 
     return product.number() == no_items; 
    } 

    @Override 
    public void run() { 
     boolean end = false; 
     while (!end) { 
      while (buffer.isEmpty()) { 
       try { 
        sleep(100); 
       } catch (InterruptedException ex) { 
        Logger.getLogger(TrashMe.class.getName()).log(Level.SEVERE, null, ex); 
       } 
      } 
      end = consume(); 
     } 
    } 
} 
関連する問題