2011-10-27 1 views
1

私はシンプルなプロデューサーコンシューマーモデルを書いてBluetoothからのデータを読み込み、10kバイトごとにファイルに書き込む状況があります。メッセージホルダーとしてVectorを使用した標準的なP-Cモデルを使用しました。だから私はこれを変更して複数のスレッドコンシューマが同じメッセージを読むことができるようにするにはどうすればよいでしょうか?私は実際にAndroidの携帯電話でこれを使用しているので、JMSはおそらくオプションではありません。シングルプロデューサーシングルコンシューマー今では複数のコンシューマーが必要

static final int MAXQUEUE = 50000; 
private Vector<byte[]> messages = new Vector<byte[]>(); 

/** 
* Put the message in the queue for the Consumer Thread 
*/ 
private synchronized void putMessage(byte[] send) throws InterruptedException { 

    while (messages.size() == MAXQUEUE) 
     wait(); 
    messages.addElement(send); 
    notify(); 
} 


/** 
* This method is called by the consumer to see if any messages in the queue 
*/ 
public synchronized byte[] getMessage()throws InterruptedException { 
    notify(); 
    while (messages.size() == 0 && !Thread.interrupted()) { 
     wait(1); 
    } 
    byte[] message = messages.firstElement(); 
    messages.removeElement(message); 
    return message; 
} 

私は

答えて

0

これは、いくつかのコードを掘り下げていくつかの既存の例を修正するときの例として思いついたものです。

package test.messaging; 

import java.util.ArrayList; 
import java.util.concurrent.LinkedBlockingQueue; 

public class TestProducerConsumers { 

    static Broker broker; 

    public TestProducerConsumers(int maxSize) { 
     broker = new Broker(maxSize); 
     Producer p = new Producer(); 
     Consumer c1 = new Consumer("One"); 
     broker.consumers.add(c1); 
     c1.start(); 

     Consumer c2 = new Consumer("Two"); 
     broker.consumers.add(c2); 
     c2.start(); 

     p.start(); 
    } 

    // Test Producer, use your own message producer on a thread to call up 
    // broker.insert() possibly passing it the message instead. 
    class Producer extends Thread { 

     @Override 
     public void run() { 
      while (true) { 
       try { 
        broker.insert(); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 

    class Consumer extends Thread { 
     String myName; 
     LinkedBlockingQueue<String> queue; 

     Consumer(String m) { 
      this.myName = m; 
      queue = new LinkedBlockingQueue<String>(); 
     } 

     @Override 
     public void run() { 
      while(!Thread.interrupted()) { 
       try { 
        while (queue.size() == 0 && !Thread.interrupted()) { 
         ; 
        } 
        while (queue.peek() == null && !Thread.interrupted()) { 
         ; 
        } 
        System.out.println("" + myName + " Consumer: " + queue.poll()); 
       } catch (Exception e) { } 
      } 
     } 
    } 

    class Broker { 
     public ArrayList<Consumer> consumers = new ArrayList<Consumer>(); 

     int n; 
     int maxSize; 

     public Broker(int maxSize) { 
      n = 0; 
      this.maxSize = maxSize; 
     } 

     synchronized void insert() throws InterruptedException { 
        // only here for testing don't want it to runaway and 
        //memory leak, only testing first 100 samples. 
      if (n == maxSize) 
       wait(); 
      System.out.println("Producer: " + n++); 
      for (Consumer c : consumers) { 
       c.queue.add("Message " + n); 
      } 
     } 

    } 

    public static void main(String[] args) { 
     TestProducerConsumers pc = new TestProducerConsumers(100); 

    } 
} 
+0

忙しい待っている間、消費者の(*)*ループ中にこれらの*を使用しないでください!それは本当に悪いコーディングです。 * insert()*の* wait()*を含む*ブロッキングキュー*があなたのためにすべてを処理します。 (対応する* notify()*呼び出しはどこにありますか?) – JimmyB

+0

消費者がwait()を使用していないため、挿入時に通知する必要はありません。私のメッセージがリアルタイムである必要はないので、私は消費者の待ち時間の代わりにいくつかの睡眠を投げた。 – JPM

+0

それでは、 "if(n == maxSize)wait();"とは何ですか? – JimmyB

0

あなたは間違いなく、代わりにVectorqueueを使用する必要がありますMessage ParserセクションOreilly帳からコードを参照しています!
すべてのスレッドに独自のキューを与え、新しいメッセージが受信されると、すべてのスレッドのキューに新しいメッセージをadd()送ります。フレキシビリティのために、リスナーパターンも便利です。

編集:

[OK]を、私はあまりにも、例を追加しなければならないと感じ:
(クラシックオブザーバーパターン)

これはインタフェースで、すべての消費者が実装する必要があります。

public interface MessageListener { 
    public void newMessage(byte[] message); 
} 

プロデューサーは次のようになります。

public class Producer { 
    Collection<MessageListener> listeners = new ArrayList<MessageListener>(); 


    // Allow interested parties to register for new messages 
    public void addListener(MessageListener listener) { 
    this.listeners.add(listener); 
    } 

    public void removeListener(Object listener) { 
    this.listeners.remove(listener); 
    } 

    protected void produceMessages() { 
    byte[] msg = new byte[10]; 

    // Create message and put into msg 

    // Tell all registered listeners about the new message: 
    for (MessageListener l : this.listeners) { 
     l.newMessage(msg); 
    } 

    } 
} 

と消費者のクラスは、(私たちのためにINGのすべてのことwait() INGとnotify()を行いブロッキングキューを使用して)次のようになります。

public class Consumer implements MessageListener { 

    BlockingQueue<byte[]> queue = new LinkedBlockingQueue<byte[]>(); 

    // This implements the MessageListener interface: 
    @Override 
    public void newMessage(byte[] message) { 
    try { 
     queue.put(message); 
    } catch (InterruptedException e) { 
     // won't happen. 
    } 
    } 

    // Execute in another thread:  
    protected void handleMessages() throws InterruptedException { 
     while (true) { 
      byte[] newMessage = queue.take(); 

      // handle the new message. 
     } 
    } 


} 
+0

ベクターとキューの両方はオブジェクトを保持でき、プリムは保持できません。ブロッキングキューと上記のコード全体を使用すると、queue.put(message)とqueue.take()のように簡単になります。 – JimmyB

+0

私はSettableFutureを動作させることができないので、私はあなたの例にもっと傾いています。 – JPM

+0

私は常にListenersでこのようなことを処理しました。これは、JMSシステムからのリアルタイムデータを取得し、これらの状態変更でカスタムコンポーネントを更新するためのGUI状態リスナーシステム全体を作成しました。 – JPM

1

パブ・サブメカニズムは間違いなくあなたが望むものを達成するための方法です。 Android用に開発することでJMSの使用が制限される理由はわかりませんが、これは単純な仕様です。 SOの this threadをチェックしてください。

+0

はい、それは主に、スレッド間でメッセージを送信する電話アプリで必要なもののようなものであるJMSの使用について話しています。 – JPM

+0

その場合、Observer/ObservableパターンにあなたのVector(またはHanno Binderとして提案されたQueue)の単純なラッピングができますか? – mazaneicha