2012-04-24 20 views
0

私は10人の労働者と、以下の二つの方法の一覧のコードを持っている:消費者や労働者のパターン

public void demoDeques() { 
     int maxSizeOfJobDeque = 3; 
     Producer producer = new ProducerImpl(maxSizeOfJobDeque); 

     Logger.debug("WorkFlowEngineImpl : " + 
       "Creating Workers and adding them to allocator"); 
     List<Worker> workerList = buildWorkerList(producer); 
     Logger.debug("WorkFlowEngineImpl : " + 
       "Assigning some jobs to the workers. " + 
        "The workers have not been started yet"); 

     for (int i=1; i<4; i++) { 
      producer.assign(new JobImpl("job " + i, i)); 
      try { 
       Thread.sleep(4000); 
      } catch(InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 

     Logger.debug("WorkFlowEngineImpl : " + "Starting the workers"); 
     startWorkersAndWait5Seconds(workerList); 
     Logger.debug("WorkFlowEngineImpl : " + 
       "Assigning some more jobs to the " + 
        "started workers"); 

     for (int i=4; i<7; i++) { 
      producer.assign(new JobImpl("Job " + i, i)); 
      try { 
       Thread.sleep(4000); 
      } catch(InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 

     Logger.debug("WorkFlowEngineImpl : " + "Assigning More Jobs"); 
     for (int i=7; i<11; i++) { 
      producer.assign(new JobImpl("job" + i, i)); 
      try { 
       Thread.sleep(4000); 
      } catch(InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

プロデューサー:

public synchronized void assign(Job job) { 
    Set<Worker> workerSet = jobMap.keySet(); 
    LinkedBlockingDeque<Job> jobQueue; 
    StringBuffer sb; 

    for (Worker worker : workerSet) { 
     jobQueue = jobMap.get(worker); 

     sb = new StringBuffer(); 
     sb.append("Assigning job "); 
     sb.append(job.getJobNumber()); 
     sb.append(" to "); 
     sb.append(worker); 
     sb.append("'s jobs Deque"); 

     Logger.debug("Producer : " + sb.toString()); 

     if (! jobQueue.offerFirst(job)) { 
      jobQueue.pollLast(); 
      jobQueue.offerFirst(job); 
     } 
    } 
} 

私は私が持っているように、2つの方法を変更しようとしていますアロケータは100のジョブのリストを有し、100人に達するまで時間がたつにつれて10人の作業者のそれぞれに最大3つのジョブが与えられるように割り当てる。すなわち、 6になるので、10人の作業者に到達すると、3人の作業を割り当てた作業者1に戻って、100番目の作業に達するまで停止し、すべてのジョブが割り当てられていることを警告します。助けを借りて助けてください。

+2

何らかの順序でコードを入手してください。それは解析さえしない。 –

+0

あなたは、その特定の操作命令を望むことについて、もう少し説明することはできますか? ExecutorServiceを使用してジョブを処理するだけの理由は何ですか? – Gray

+0

@Grayはい宿題の問題です。私は、労働者を通して、また職場を通して何らかのループを作り出すと思っていました。私を「ExecutorService」の方向にうまく行き渡らせるには、優先順位はありません。 – CCharles

答えて

1

私はあなたの仕事の割り当て方法には反対しています。私はあなたのコードを掛けていると思います。

一般に、プロデューサは作業を作成してキューに割り当てる必要があります。雇用を減らすことを心配するのは、生産者の責任ではありません。作業員が利用可能になると、キューからジョブが引き出されます。彼らが1つの仕事か3つを引くかどうかは、それが引っ張ってくると単にワーカースレッド内の論理の問題です。

おそらく、キューのシングルトンクラスを作成したいと思うでしょう。受信ジョブと要求ジョブを処理する必要があります。

このようにアプローチすると、生産者から作業者を切り離すことができます。ワークロード/ボトルネックの指示に従って、プロデューサまたはワーカーを追加できます。また、労働者がプロデューサーに作業を進める準備ができていることを知らせる、何らかの信号メカニズムを生成しないようにします。最後に、これにより、あなたはdemoDequeue内の所定の位置にあるマジックナンバーを使って、ジョブが1つしか利用できないか、ジョブの番号が順不同になってしまうようなさまざまな競合状態を心配する必要がなくなります。

HTH申し訳ありませんが、直接的な回答ではありませんが、問題を解決するためのより良い方法であなたを得る必要があります。

+1

+1スレッドマイクロ管理の回避を提案するために、高度に熟練した開発者が試してみても、ほとんどいつもひどく終わるものです。 –

+0

@ MartinsJames - ありがとう。スレッドの管理が間違っていると、間違いなく次のような苦労があります。 – GlenH7

2

あなたの質問には特定の実行順序が記載されていますが、あなたのコメントはこれが真の要件ではないことを示しているようです。独自のスレッドプールを実装する代わりに、Java 1.5で提供されるExecutorsを使用することをお勧めします。たとえば:

ExecutorService threadPool = 
    Executors.newFixedThreadPool(NUMBER_THREADS_TO_RUN_JOBS); 
// your producers then just have to submit jobs to the pool 
for (int i=1; i<4; i++) { 
    threadPool.submit(new JobImpl("Job " + i, i)); 
} 

私は処理がそれぞれの「仕事」ではなくJobImplが動作するように上記のコードのためRunnableを実装する必要がある場合に発生する必要がまさにあなたの例から見ることができません。仕事の価値を返す必要がある場合は、JobImplCallableに変更し、call()によって返される結果を得るためにメソッドによって返されたFutureを使用して返すことができます。

+0

My JobImplの外観は次のようになります。 public class JobImplはジョブ{ を実装しています。 private int jobNumber; 公共JobImpl(文字列ジョブ、INT JOBNUMBER){ this.job =ジョブ。 this.jobNumber = jobNumber; }パブリック文字列getJob(){ 戻りthis.job。 }公共のint getJobNumber(){ リターンthis.jobNumber。 }} – CCharles

+0

OK私の主なクラスがあります 'ExecutorServiceのスレッドプール= \t \t Executors.newFixedThreadPool(1)。 { threadPool.submit(新しいJobImpl( "ジョブ"、I)();私は4 CCharles

+0

しかし、私は私のJobImplがjdeque返却する必要があります: '公共JobImpl(文字列ジョブ、int型JOBNUMBER){ this.job =仕事を。 this.jobNumber = jobNumber; } ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(新しいJobImpl(ジョブ、JOBNUMBER){ ます。public void実行(){ \t文字列のJ =持つInteger.toString(JOBNUMBER); \tのDeque jdeque =新しいLinkedListの(); \t \t jdeque.add (ジョブ+ j)は、\t \t \t}})。 今後の未来= executorService.submit(新しいJobImpl(jdeque){; } 公共JobImplコール()は例外{ 戻りjdequeをスロー}); ' – CCharles