2016-04-11 10 views
2

スレッドセーフなブロッキングキューをjavaで実装していますが、スレッドが順番に実行されていて並列に実行されていないようです。誰かが私が間違っていることを見つけるのを助けることができますか?私のコードは次の通りです:Javaでブロッキングキューを実装する際の問題

package com.example; 

import java.util.LinkedList; 
import java.util.List; 

class Producer implements Runnable{ 

    BlockingQueue blockingQueue; 

    public Producer(BlockingQueue blockingQueue) { 
     this.blockingQueue = blockingQueue; 
    } 

    @Override 
    public void run() { 
     int counter = 0; 
     while (true) 
     { 
      try 
      { 
       blockingQueue.enqueue(counter++); 
      } 
      catch (InterruptedException ex) 
      { 
       ex.printStackTrace(); 
      } 
     } 
    } 
} 

class Consumer implements Runnable{ 

    BlockingQueue blockingQueue; 

    public Consumer(BlockingQueue blockingQueue) { 
     this.blockingQueue = blockingQueue; 
    } 

    @Override 
    public void run() { 


     while (true) 
     { 
      try 
      { 
       blockingQueue.dequeue(); 
      } 
      catch (InterruptedException ex) 
      { 
       ex.printStackTrace(); 
      } 
     } 
    } 
} 

public class Test{ 
    public static void main(String[] args) { 

     BlockingQueue blockingQueue = new BlockingQueue(10); 
     Thread producer = new Thread(new Producer(blockingQueue), "Prod"); 
     Thread consumer = new Thread(new Consumer(blockingQueue), "Cons"); 
     producer.start(); 
     consumer.start(); 
    } 
} 

class BlockingQueue { 

    private List queue = new LinkedList(); 
    private int limit = 10; 

    public BlockingQueue(int limit){ 
     this.limit = limit; 
    } 


    public synchronized void enqueue(Object item) 
      throws InterruptedException { 

     while(this.queue.size() == this.limit) { 
      System.out.println("Wait Enque : "+Thread.currentThread().getName()); 
      wait(); 
     } 

     Thread.sleep(1000); 
     System.out.println("Add Item : " + Thread.currentThread().getName()); 
     this.queue.add(item); 
     notifyAll(); 
    } 


    public synchronized Object dequeue() 
      throws InterruptedException{ 

     while(this.queue.size() == 0){ 
      System.out.println("Wait Denque : "+Thread.currentThread().getName()); 
      wait(); 
     } 

     Thread.sleep(1000); 
     System.out.println("Remove Item : " + Thread.currentThread().getName()); 
     notifyAll(); 
     return this.queue.remove(0); 
    } 

} 

私はマルチスレッド化が初めてです。

これは私が取得しています出力されます。BlockingQueue

Add Item : Prod 
Add Item : Prod 
Add Item : Prod 
Add Item : Prod 
Add Item : Prod 
Add Item : Prod 
Add Item : Prod 
Add Item : Prod 
Add Item : Prod 
Add Item : Prod 
Wait Enque : Prod 
Remove Item : Cons 
Remove Item : Cons 
Remove Item : Cons 
Remove Item : Cons 
Remove Item : Cons 
Remove Item : Cons 
Remove Item : Cons 
Remove Item : Cons 
Remove Item : Cons 
Remove Item : Cons 
Wait Denque : Cons 
Add Item : Prod 
Add Item : Prod 
Add Item : Prod 
+0

出力の一部を表示することができます。一見すると、あなたのコードはOKです。 – GhostCat

+0

彼は彼自身のクラスBlockingQueueを提供しています。たぶんそれは悪い命名です。本当に問題はありません。 – GhostCat

+0

はい私は私の質問を編集しました – Qasim

答えて

2

あなたの実装は避けるべきである「仕事で眠れる」パターンを示しています。

何らかの理由で、Thread.sleepをデータ構造の​​のメソッドenqueue and dequeueに呼び出すことにしました。私はそれがまったく必要とは思わない。これらのメソッドが最低限必要なのは、共有可能な可変状態(つまり、queue)をスレッドセーフな方法で使用することです。 dequeueため

public synchronized void enqueue(Object item) 
     throws InterruptedException { 

    while (this.queue.size() == this.limit) { 
     System.out.println("Wait Enque : "+Thread.currentThread().getName()); 
     wait(); 
    } 

    System.out.println("Add Item : " + item.toString() + " " + Thread.currentThread().getName()); 
    this.queue.add(item); 
    notifyAll(); 
} 

と同様に:そして、これらの方法でThread.sleep()への呼び出しを除くすべてのものが良い最初の試みです。

あなたのスレッドがしていることは、彼らが非常に貪欲であることです:-)。たぶん、あなたは実際にあなたのスレッドのrunメソッド内部デキューされたアイテムで何かを実行する必要があります。

while (true) 
    { 
     try 
     { 
      Object deq = blockingQueue.dequeue(); 
      Thread.sleep(1000); // sleeping to simulate using the de-queued item 

     } 
     catch (InterruptedException ex) 
     { 
      ex.printStackTrace(); 
     } 
    } 

ので、実際には、私がしたすべてはあなたのデータ構造の方法のうち、眠っ参加しました。そして私は、インターリーブと私は期待したいものです以下の出力が得られます。もちろん

Add Item : 0 Prod 
Remove Item : 0 Cons 
Add Item : 1 Prod 
Remove Item : 1 Cons 
Add Item : 2 Prod 
Add Item : 3 Prod 
Remove Item : 2 Cons 
Add Item : 4 Prod 
Add Item : 5 Prod 

、1を提案することができますいくつかの改善があります。

  1. は、すべてのフィールドfinalを行います。
  2. 公正性を確保するために、CountDownLatchのようなものを使用してください。

いくつかの洞察については、java.util.concurrent.LinkedBlockingQueueもご覧ください。我々はsynchronized blocksを使用する場合もintrinsic locks呼ばれるので

0

あなたはこの動作を取得し、我々は、待機中のスレッドの一つが(intrinsic lockunfairロックで)最初のロックを取得する保証はありません。期待どおりの動作を得るには、fair explicit lockを使用する必要があります。この方法で、ロックを待っている最初のスレッドが最初にスレッドを獲得することが確実になります。ここでは、そのはconditionを対応するfair explicit lockを作成する方法である:

class BlockingQueue { 

    private final Lock lock = new ReentrantLock(true); 
    private final Condition condition = lock.newCondition(); 
    ... 

エンキューのコードは、その後、次のようになります。

public void enqueue(Object item) 
    throws InterruptedException { 

    try { 
     lock.lock(); 
     while(this.queue.size() == this.limit) { 
      System.out.println("Wait Enque : "+Thread.currentThread().getName()); 
      condition.await(); 
     } 

     Thread.sleep(1000); 
     System.out.println("Add Item : " + Thread.currentThread().getName()); 
     this.queue.add(item); 
     condition.signalAll(); 
    } finally { 
     lock.unlock(); 
    } 
} 

そして最後にデキューのコードは次のようになります。

public Object dequeue() 
    throws InterruptedException{ 

    try { 
     lock.lock(); 
     while(this.queue.size() == 0){ 
      System.out.println("Wait Denque : "+Thread.currentThread().getName()); 
      condition.await(); 
     } 

     Thread.sleep(1000); 
     System.out.println("Remove Item : " + Thread.currentThread().getName()); 
     condition.signalAll(); 
     return this.queue.remove(0); 
    } finally { 
     lock.unlock(); 
    } 
} 

出力は次のとおりです。

Add Item : Prod 
Remove Item : Cons 
Add Item : Prod 
Remove Item : Cons 
Add Item : Prod 
Remove Item : Cons 
Add Item : Prod 
Remove Item : Cons 
Add Item : Prod 
Remove Item : Cons 
Add Item : Prod 
Remove Item : Cons 
Add Item : Prod 
Remove Item : Cons 
関連する問題