2017-01-06 11 views
0

私は大きなキューを持っていると言います。10,000個のオブジェクトのようなものです。キューが空になるまで、5つのワーカースレッドを持つスレッドプールを作成して、それぞれがキューからアイテムを削除して作業したいとします。キューからのマルチスレッドジョブ - ジョブが多すぎますか?

私はさまざまな場所で見た設定を使用して、すぐに10,000個のジョブを作成することになりますが、5人の作業者から実行することが心配です。私はこれが本当にスケーラブルではないように感じています - キューには既に10,000個のアイテムがあり、スタックにはさらに10,000のジョブがあります(積極的に実行されていなくてもメモリ問題のようです)。

これは、この回答が示唆しているようです:https://stackoverflow.com/a/9916299/774359 - それは私を心配する部分 "// now submit our jobs"です。キューをジョブに効果的にダンプするのは問題ですか?

ここで私がこれまで持っているものの簡単な例です:

メインで():

ワーカークラスで
ExecutorService executor = Executors.newFixedThreadPool(5); 
while(!hugeQueue.isEmpty()) { 
    String work = hugeQueue.remove(); 
    System.out.println("Creating job for " + work); 
    Runnable worker = new Worker(work); 
    executor.execute(worker); 
} 

public Worker(String itemFromQueue) { this.job = itemFromQueue; } 

@Override 
public void run() { 
    System.out.println("Working on " + this.itemFromQueue); 
    //Do actual work 
} 

hugeQueueが10,000番号が含まれている場合、私はすべての10,000の「ジョブの作成」メッセージとそれに続くすべての「作業中の」メッセージが表示されます。一度に5つのジョブしか作成されていない場合は、スレッドが開かれ、別のジョブが作成されてから動作するようになっています。そうすれば、スタックには10,000ジョブはありません。どうすればそれを達成できますか?私はこのアーキテクチャについて正しく考えていますか?


編集への回答に基づいて更新された情報が含まれていますsenequeのコードはstraightawayをコンパイルしていなかったので、私はいくつかのマイナーな変更を行っ@

は - 残念ながら、これの出力は、労働者だけの作成です、そして実際の仕事のどれも。メインで

():LongRunningWorkerで

int numOfThreads = 5; 
BlockingQueue<Integer> hugeQueue = new LinkedBlockingQueue<>(); 
for(int x = 0; x < 1000; x++) { hugeQueue.add(x); } 

ExecutorService executor = Executors.newFixedThreadPool(numOfThreads); 
LongRunningWorker longRunningWorker = new LongRunningWorker(); 

for(int i = 0; i < numOfThreads ; i++) { 
    System.out.println("Created worker #" + i); 
    executor.submit(longRunningWorker); 
} 
System.out.println("Done"); 

:労働者で

public class LongRunningWorker implements Runnable { 
    BlockingQueue<Integer> workQueue; 
    void spiderExmaple(BlockingQueue<Integer> workQueue) { 
     this.workQueue = workQueue; 
    } 

    @Override 
    public void run() { 
     try { 
      while(workQueue.poll(3, TimeUnit.SECONDS) != null) { 
       Integer work = workQueue.remove(); 
       System.out.println("Working on " + work); 
       new Worker(work).run(); 
      } 
     } catch (InterruptedException e) { e.printStackTrace(); } 
    } 
} 

public class Worker implements Runnable{ 
    Integer work; 
    Worker(Integer x) { this.work = x; } 

    @Override 
    public void run() { 
     System.out.println("Finished work on " + this.work); 

    } 
} 
+0

A ThreadPoolExecutorService:

BlockingQueue<String> hugeQueue = ... ExecutorService executor = Executors.newFixedThreadPool(5); LongRunningWorker longRunningWorker = new LongRunningWorker(hugeQueue); for(int i = 0; i < 5 ; i++) { executor.submit(longRunningWorker) } 

はその後LongRunningWorkerは次のように定義されています。そこでここでは、1つのキューから5つのスレッドによって読み取られる他のキューに入れます。 – seneque

+0

@seneque Right - これは、自分の10,000個のキューのキューに対して、同じサイズ、正しい?オブジェクトは異なっていますが、私の質問は、これが実行可能な解決策であるかどうかです.2倍のメモリが必要ですが、その代わりに – Jake

+0

、hugeQueueがブロッキングキューであれば、キューへの参照とキューからのポーリングを5スレッドにできます。 – seneque

答えて

1

一つの解決策は、あなたに5回のスレッドに直接ポーリングキューを持つことになります。内部キューを持っている(あなたは(5)Executors.newFixedThreadPoolを呼び出すときに作成されたものです)

class LongRunningWorker(BlockingQueue<String> workQueue) extends Runnable { 
    final BlockingQueue<String> workQueue; 
    LongRunningWorker(BlockingQueue<String> workQueue) { 
     this.workQueue = workQueue; 
    } 

    public void run() { 
     while((String work = workQueue.poll(3, TimeUnit.Second) != null) { 
      try { 
       new Worker(work).run(); 
      } catch (Exception e) { 
       // 
      } 
     } 
    } 
} 
+0

完璧な、まさに私が探していたもの。ありがとう! – Jake

+0

コードをチェックした後、コードがうまくいかないようです - 助言?それは労働者を創造したという事実だけを示していますが、仕事自体は決して始まらないのです。 – Jake

+0

新しいワーカー(仕事).run(); < - それで、それは仕事をすることになっている – seneque

関連する問題