2012-04-16 2 views
1

私はスレッドを間違って使用していると思うので、これが良いデザインかどうか尋ねたいと思っていました。基本的には、キューからデータを取り出して処理する(処理は純粋な数学で100%CPUを集中させます)、次にデータが良い場合は「良い」キューに送信します。そうでなければ、完全に破棄されるか、さらなる処理のために最初の「作業」キューに戻される。これは高水準のロジックです。キューがメモリに入っていたとき、私のプログラムはすべてのコアを使用していました。私のデータが増えるにつれ、キューサーバーを使用してキューを保存し、複数のマシンに処理を分散させることにしましたが、遅い(各コアのわずか40%-60%が使用されています)。フォークスレッドがプログラムをスピードアップしていない、私は自分の状況で正しく使用していますか?

私は自分のコード(yourkitとnetbeansでビルドされたものを使用しています)をプロファイリングしようとしましたが、ほとんどの時間(80%)がキュープログラムに費やされています。私はおそらく私のプログラムでは、すべての外部プログラムのものを別のスレッドにプッシュすることによって絶えず稼働しているナンバー・クランチを保つことができると思ったが、パフォーマンスには役に立たず、私はそれが間違っているのではないかと思っている。私は確信していませんが、私は、既存のスレッド(親スレッド)からスレッド(子スレッド)を起動すると、親が完了する前に子を完了しなければならないのだろうか?

私のコードはかなり大きく、99%は必要ではないので、高レベルのバージョンを書くだけです(コンパイルできないかもしれませんが、自分が何をやっているのか分かりません)。

public class worker { 

    private static ExecutorService executor; 
    static { 
     final int numberOfThreads = 4; 
     executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());     
    } 
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     // TODO Auto-generated method stub 
     System.out.println("starting worker.."); 
     //Connection information goes here 
     channel.basicQos(50); //this is part of the connection, the queue server only gives 50 messages without acknowledgment 

     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //gets data from queue 
      String message = new String(delivery.getBody());   
      executor.submit(new DoWork(channel, message, delivery)); 
     } 

class DoWork implements Runnable{ //where the main work happens 
    //setup variables, basically passing queue connection information as well as data here, so I only need to rely on one connection     
    public void run() { 
     new Thread(new Awk_to_Queue(channel, delivery)).start(); //this sends an Awk_to_Queue to the queue, I launch a thread for this so my program can keep working. 

     if (data is good) { 
      new Thread(new Send_to_Queue("success_queue", message1, channel)).start();                 
      continue; 
     }  else if (Data is not good but not bad either) { 
     new Thread(new Send_to_Queue("task_queue", message2, channel)).start();              
     }  

class Send_to_Queue implements Runnable{        
    public void run() { 
     //takes data in and sends to queue in the way I used to previous do it, but just does it in a thread. queue connection is passed over so I only need to have one connection 
    } 
} 


class Awk_to_Queue implements Runnable{ 
    public void run() { 
     //awk's queue so queue server can send one more piece of data to queue up 
    } 
} 

あります。ごめんなさい、読みにくいです(私がやっていることの構造を示すためにたくさんのものを削除しました)。スレッドをフォークしても速度に影響しないことは間違っていますか(速くなるのか、プロファイラの結果が変わるのか分かりません)問題は私がスレッドをフォークする方法(new Thread(new Awk_to_Queue(channel, delivery)).start();)ですか、それともスレッドデザインのようなものですか?

+0

私にはあまりにも多くのスレッドがあるように感じます。どのようなメリットがあるのですか?一度この作業が負荷を受けたら、最終的にいくつのプロセスが実行されていますか? – sarnold

+0

@sarnold良い質問ですが、私はプログラムを見ると、4つのスレッドがラベル付きのthread1-1(-2、-3など)を実行していて、次にスレッド2で別の4個のセットが実行されていることを示しています。一度に1つまたは2つのキューを処理する別のエグゼキュータを起動する方が良いと思いますか? –

答えて

6

2つのことが頭に浮かぶ:

1)リモート・キューを読み込むだけのスレッドは、main()メソッドでは、あなたの無限ループを実行しているメインスレッドであるように思われます。しかし、あなたがその中に物事を入れるのは速いのですが、あなたがそれらを取り戻すことができるよりも速く処理することはありません。

2)産卵new Thread();は、「高価な」操作です。単一の短いタスクのために新しいスレッドを絶えず作成することは、メモリ割り当てとネイティブリソースを交代するだけです。これらの「キュー・プット」を第2のExecutorServiceにオフロードして、無限の数のスレッドを生成するのではなく、サイズを調整する必要があります。

+0

良い点、私はあなたの2つの提案を試みます。私はポイント2を行う方法を知っていますが、ポイント1は確実ではありません。無限のwhileループ以外では、どのようにキューを読み続けるのですか?私は自分のプログラムに50個の作業を提供するQoSを設定しましたが、私は4つのスレッドを実行しても同じ効果がありますか? –

+0

それは 'consumer.nextDelivery();'とコンシューマオブジェクトがスレッドセーフであるかどうかによって決まります。そうであれば、N個の新しいスレッドの中にループコードを置くだけです。 – Affe

+0

ありがとう、彼らはスレッドセーフです、私はあなたの考えを試してみます。最後の質問の1つは、新しいスレッド()をExecutorServiceに置き換えると、スレッド内でそれを起動することに何らかの影響がありますか?私はメインの起動スレッドを実行すると私のプログラムはすべてのスレッドが終了するまで終了しないので私は尋ねています。私の場合、DoWorkはSend_to_Queueが完了するか、それとも完全に独立しているかを待たなければなりませんか? –

関連する問題