2013-05-21 11 views
13

それぞれのワーカが独自の接続ソケットを持つJavaでワーカーのプールを構築する必要があります。ワーカースレッドは実行時にソケットを使用しますが、後で再利用するために開いたままにします。アドホックベースのソケットの作成、接続、破棄に伴うオーバーヘッドにはオーバーヘッドがかかりすぎるため、ソケット接続で作業者プールを事前準備しておく準備ができている必要があります。他のスレッド(ソケットはスレッドセーフではありません)から安全なソケットリソースを維持しながら、仕事を取るので、我々はこれらの線に沿って何かを必要とする...アプリケーションの起動時に接続オブジェクト(ソケット)を再利用するためにワーカースレッドのプールを事前に初期化する

public class SocketTask implements Runnable { 
    Socket socket; 
    public SocketTask(){ 
    //create + connect socket here 
    } 

    public void run(){ 
    //use socket here 
    } 

}

、我々は初期化したいです労働者と、うまくいけば、ソケット接続も何らかの形で...

MyWorkerPool pool = new MyWorkerPool(); 
for(int i = 0; i < 100; i++) 
    pool.addWorker(new WorkerThread()); 
グレーとjontejjから有益なコメントをもとに

作業はアプリケーションによって要求されるように、我々は、即時実行のための労働者のプールにタスクを送信...ワーキングコード
で更新

pool.queueWork(new SocketTask(..)); 


、私が働いて、次のコード...

SocketTask

を持っています

ExecutorServiceの

ExecutorService threadPool = 
    Executors.newFixedThreadPool(5, Executors.defaultThreadFactory()); 

    int tasks = 15; 
    for(int i = 1; i <= tasks; i++){ 
     threadPool.execute(new SocketTask("foobar-" + i)); 
    } 

私はいくつかの理由のために、このアプローチが好き...

  • ソケットは、並行性の問題を排除し、実行中のタスクに利用できる(ThreadLocalの経由)ローカルオブジェクトです。
  • ソケットは一度作成され、開いたままにされ、新しいタスクがキューに入れられると が再利用され、ソケットオブジェクトの作成/破棄のオーバーヘッドがなくなります。
+0

おかげで、私をたくさん助けました!私は不思議に思っていました:どのようにスレッドプールがシャットダウンされたときにdb接続を閉じますか? – kaufmanu

+0

実行スレッドの[秩序シャットダウン](http://stackoverflow.com/questions/3332832/graceful-shutdown-of-threads-and-executor)を検索して、役立つことを願っています。 – raffian

+0

私はそれをチェックしました、ありがとう!私は完全に私を助けませんでした...私は、ThreadLocalに格納されているすべてのdb接続のdbconnection.close()を呼び出す方法を探しています。私は本当にこれにアプローチする方法がわからない...つまり、スレッドプールがシャットダウンされると、db接続は自動的に閉じられません。 – kaufmanu

答えて

10

SocketBlockingQueueに入れることが考えられます。その後、Socketが必要なときは、スレッドはtake()からキューに入り、完了するとSocketとなり、それはput()に戻ります。

public void run() { 
    Socket socket = socketQueue.take(); 
    try { 
     // use the socket ... 
    } finally { 
     socketQueue.put(socket); 
    } 
} 

これは、追加の利点があります。

  • あなたが戻ってExecutorServiceコードを使用することに行くことができます。
  • 結果の処理とソケット通信を分離することができます。
  • 処理スレッドとソケットに1対1の対応は必要ありません。しかし、ソケット通信は作業の98%になる可能性があります。
  • 完了してExecutorServiceが完了したら、ソケットをデキューして閉じるだけで、ソケットをシャットダウンできます。

BlockingQueueのオーバーヘッドが追加されますが、Socketの通信を行っている場合は通知されません。

私たちは、私はあなたがスレッド地元の人々を使用した場合、あなたがこの作品を作ることができると思う

... ThreadFactoryが私たちのニーズに対応して信じていません。あなたのスレッドファクトリは、最初にソケットを開き、それをスレッドローカルに格納したスレッドを作成し、 Runnable argを呼び出し、ソケット内のすべての作業を行い、 ExecutorService内部キューからジョブをデキューします。一度それが完了したら、 arg.run()メソッドは終了し、スレッドローカルからソケットを取得して閉じます。

次のようなものです。ちょっと面倒ですが、あなたはそのアイデアを得るべきです。

ExecutorService threadPool = 
    Executors.newFixedThreadPool(10, 
     new ThreadFactory() { 
     public Thread newThread(final Runnable r) { 
      Thread thread = new Thread(new Runnable() { 
       public void run() { 
        openSocketAndStoreInThreadLocal(); 
        // our tasks would then get the socket from the thread-local 
        r.run(); 
        getSocketFromThreadLocalAndCloseIt(); 
       } 
      }); 
      return thread; 
     } 
     })); 

だからあなたのタスクはRunnableを実装し、次のようになります。作業コードスニペット用

public SocketWorker implements Runnable { 
    private final ThreadLocal<Socket> threadLocal; 
    public SocketWorker(ThreadLocal<Socket> threadLocal) { 
     this.threadLocal = threadLocal; 
    } 
    public void run() { 
     Socket socket = threadLocal.get(); 
     // use the socket ... 
    } 
} 
+1

いいえ。「ThreadFactory」はスレッドごとに1回だけ呼び出されます。したがって、あなたのスレッドプールに10個のスレッドがある場合、そのプールはプールが初期化され、スレッドが開始されるときにのみ作成されます。 @SAFXのすべてのタスクではありません。 – Gray

+0

@SAFXを覚えておいて、newThreadメソッドのrun()メソッドはタスクが実行されていません。これは 'ExecutorService'がタスクをデキューして実行するのに使うrunメソッドです。 – Gray

+0

はい、いくらか@SAFXです。 'r.run()'は並行コードの内部で実行可能です。内部タスクブロックキューからタスクをデキューし、タスクの 'run()'(例外をキャッチするなど)を呼び出します。私は答えにサンプルタスククラスを追加します。 – Gray

5

私はあなたがそれぞれのスレッドが独自のソケットを取得ThreadLocal

package com.stackoverflow.q16680096; 

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

public class Main 
{ 
    public static void main(String[] args) 
    { 
     ExecutorService pool = Executors.newCachedThreadPool(); 
     int nrOfConcurrentUsers = 100; 
     for(int i = 0; i < nrOfConcurrentUsers; i++) 
     { 
      pool.submit(new InitSocketTask()); 
     } 

     // do stuff... 

     pool.submit(new Task()); 
    } 
} 

package com.stackoverflow.q16680096; 

import java.net.Socket; 

public class InitSocketTask implements Runnable 
{ 
    public void run() 
    { 
     Socket socket = SocketPool.get(); 
     // Do initial setup here 
    } 

} 

package com.stackoverflow.q16680096; 

import java.net.Socket; 

public final class SocketPool 
{ 
    private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){ 
     @Override 
     protected Socket initialValue() 
     { 
      return new Socket(); // Pass in suitable arguments here... 
     } 
    }; 

    public static Socket get() 
    { 
     return SOCKETS.get(); 
    } 
} 

package com.stackoverflow.q16680096; 

import java.net.Socket; 

public class Task implements Runnable 
{ 
    public void run() 
    { 
     Socket socket = SocketPool.get(); 
     // Do stuff with socket... 
    } 
} 

を使うべきだと思います。

+0

Socket ThreadLocalを作成するとスレッドに属し、作業内のソケットに直接アクセスできる可能性があります。ソケットを起動するInitTasksでThreadPoolをウォームアップするようにしてください。新しいタスクが実行されると、ThreadPoolのすべてのスレッドはソケットを準備します。 – jontejj

+1

作業コードで答えを更新しました。 – jontejj

+0

元の質問をあなたの例の作業コードで更新しました。あなたが 'SocketPool'と呼ぶものは自分のコードで' SocketTask'という名前ですが、どちらもローカルソケットオブジェクトを取得するための静的な 'ThreadLocal 'オブジェクトを持っています。 – raffian

関連する問題