2012-04-28 10 views
2

私のプログラムをマルチスレッドにする方法をよく分かりません。基本的に私は接続して動作させるキューサーバーを持っていますが、より多くのスレッドを起動してより速く作業を送信すると、スレッドがブロックされていることがわかります。スレッドでサーバーへの接続を維持していますか?

ここに私がやっていることの例があります(私のプログラムでは、共有接続から派生したいくつかのデータとチャンネルを送信しています)。

class Send_to_Queue implements Runnable{ 

    protected String queue_name = null; 
    protected Channel channel = null; 
    protected byte[] message = 0 

    public Send_to_Queue(String queue_name, byte[] message, Channel channel) { 
     // TODO Auto-generated constructor stub 
     this.queue_name = queue_name; 
     this.message = message; 
     this.channel = channel;   
    } 

チャネルは、ブロックされているすべてのスレッドが共有するスレッドに固有のものではありません。私はThreadPoolExecutorの人生の中で新しいチャンネルを作成し、各タスクで新しいもの(高価なもの)を作成しないようにする方法がわからないため、これを行うための最善の方法を少し混乱させるだけです。仕事がない場合はシャットダウンしても問題ありませんが、4スレッドと100ユニットのワークがあれば、新しいチャンネルを100回ではなく4回だけ作成したいと思っています。

私は新しいチャンネル/私のサーバーへの接続は、すべてのインスタンスに確立されていない方法でそれを行う方法を理解していないようです。スレッドに接続を渡すと、新しいチャネル(this.channelの下に新しいチャネル毎回作成された)

+0

なぜ、各スレッドで新しいチャンネルを作成するのは費用がかかりますか?私にはそれは明らかな方法です。 – Gray

+0

@Gray申し訳ありませんが、私はそれを間違って書いたと思います、私は新しいチャンネルを作りたいと思っていましたが、スレッドが作成されたときだけ(どのようにわからないのですか)毎回のデータがスレッド回)。あるチャンネルをスレッドが使用できるのは、そのスレッドを常に作成する理由です。スレッドごとに1つの接続が存続している間、そのスレッドは生き続けることができます。 – Lostsoul

+0

@Grayまたは私はこれを間違った方法で考えており、作業単位ごとに新しいチャンネルを開始していますか?私がこれについて考えてきたもう一つの理由は、スレッドごとに新しい接続を確立するために同じロジックを使うことができるからです。パフォーマンスの違いをテストできます(基本的には非常に小さい作業ですが、私はできるだけオーバーヘッドを減らそうとしています)。 – Lostsoul

答えて

1

を開始、それを持つ私はあなたがThreadが構築されたとき、それは、このチャネルを使用して行うことができるように新しいChannelを作成したいと思います並行作業。これを行うにはいくつかの異なる方法があります。通常、サーバーのシナリオでは、サーバーはその接続に新しいChannelを生成する接続を受け入れます。そしてChannelをハンドラThreadに渡します。ここでは、ソケットと同じだが、あなたのアイデアを得る:

ServerSocket socket = ... 
... 
while (true) { 
    Socket clientSocket = socket.accept(); 
    new Thread(new MyRunnable(clientSocket)).start(); 
} 
... 
public class MyRunnable implements Runnable { 
    private Socket clientSocket; 
    public MyRunnable(Socket clientSocket) { 
     this.clientSocket = clientSocket; 
    } 
    public void run() { 
     while (!done) { 
      // use the socket associated with this thread 
     } 
    } 
} 

あなただけのリモートサーバーへの接続を行うために、スレッドの束を作成しているし、あなたがループ内でまたはExecutorServiceプールのいずれかを使用していることを行うことができます動作しない場合:

ExecutorService threadPool = Executors.newFixedThreadPool(100); 
for (int i = 0; i < 100; i++) { 
    threadPool.submit(new Runnable() { 
     public void run() { 
      Channel threadChannel = // create channel here; 
      while (!done) { 
       // use the per-thread channel 
      } 
     } 
    }); 
} 

もう一つの人気パターンは、スレッドが一連のユニットで動作するようにSocketまたはChannelを使用することです。その目的でBlockingQueueを使用することができます。

BlockingQueue<WorkUnit> workQueue = new LinkedBlockingQueue<WorkUnit>(); 
... 
// add work units to the work queue 
for (int i = 0; i < 1000; i++) { 
    // add work to the queue 
    workQueue.put(new WorkUnit(i)); 
} 

// the MyRunnable above can then be modified like this: 
    ... 
    public void run() { 
     while (!done) { 
      WorkUnit workUnit = workQueue.take(); 
      // use MyRunnable socket or channel and do the WorkUnit 
     } 
    } 
+0

私はあなたのアプローチを理解し、新しいチャネルを作成してそれを実行可能ファイルに送信しますが、もし100個の作業単位があれば、スレッドが動作するには1つのスレッドしか必要としないときに100チャネルを作成しませんか? – Lostsoul

+0

ああ、私は@Lostsoulを見る。ですから、Yの作業単位で作業したいXスレッドがある場合、私はブロッキングキューを作成します。私は私の答えを修正します。 – Gray

+0

私はブロッキングキューを作成する方法を知っています、私はまだそれに固有のチャネルを送信しますか?スレッドが新しい作業を開始するたびに作成してください。私は、作成時にチャンネルを作成し、そのライフを維持するためのスレッドを(私の例のように)どうやって取得するのか分かりません(ただ1つの 'ワークユニット'を実行したときだけではありません) – Lostsoul

関連する問題