2017-09-26 11 views
0

私の質問は、私は1000レコードのデータセットがあります。私はこのようなデータを処理する3つのスレッド、 スレッド1をレコード1から300、スレッド2を301から600のようにしたい。 1つのスレッドは、一度に50レコードを要求してフェッチし、オブジェクトを作成してキューに入れることができます。 メインスレッドは同時にキューからデータを読み込みます。メインスレッドのコンシューマと他のスレッドのプロデューサ

以下はコードですが、私が直面している問題はrecordRead変数がレコードの読み込みを開始する場所の開始点を示していることです。 しかし、スレッドごとに異なる値を設定するにはどうすればいいですか?例えばthread1の場合は0、recordsToReadの場合は300、thread2の場合はrecordReadは300、recordsToReadの場合は300 + 300 = 600、最後のスレッドの場合は600となります。終わり。 pagesize = 50 pagesize、recordRead、recordToReadはすべてメインクラスとメインスレッドに属する変数です。

ExecutorService service = Executors.newFixedThreadPool(nThreads); 
    while(nThreads > 0) { 
     nThreads--; 
     service.execute(new Runnable() { 

      @Override 
      public void run() { 
       // TODO Auto-generated method stub 

       do { 
        int respCode = 0; 
        int RecordsToRead = div; 
        JSONObject jsObj = new JSONObject(); 
        jsObj.put("pagesize", pageSize); 
        jsObj.put("start", recordsRead); 
        jsObj.put("searchinternalid", searchInternalId); 

        try { 
         boolean status = req.invoke(jsObj); 
         respCode = req.getResponseCode(); 

        } catch (Exception e) {   
         req.reset(); 
         e.printStackTrace(); 
         return true; 
        } 
        JSONObject jsResp = req.getResponseJson(); 
        //here jsResp will be added to ArrayBlockingQueue. 

        req.reset(); 
       }while(!isError && !isMaxLimit && recordsRead < RecordsToRead); 

      } 

     }); 
    } 

このループの後、キューを読み取るメインスレッドのコードになります。 すべてのスレッドに対してrecordsReadとrecordToreadを設定する方法はありますか。

少なくとも1つのスレッドがオブジェクトをキューに挿入するまでメインスレッドを待機させる方法。

+0

'Runnable'のサブクラスを、クラスctorパラメータとして開始位置(およびその他のもの)を使って作成できます。 –

答えて

0

あなたには2つの問題があります。第1の問題は、並列チャンク計算を実行することであり、第2の問題は、これから連続したパイプラインを生成することである。最初の問題から始めることができます。あらかじめ定義されたサイズの並列計算を行うには、fmpvがfork-joinフレームワークを使用するのが最適なオプションです。パフォーマンス(仕事の盗みは本当に効果的です)だけでなく、シンプルなコードも原因です。しかし、私のために3つのスレッドに制限されているので、スレッドを直接使用することも有効です。あなたが望むだけで、私はこの方法で実装することができます:

final int chunkSize = 300; 
    //you can also use total amount of job 
    //int totalWork = 1000 and chunk size equals totalWork/threadsNumber 
    final int threadsNumber = 3; 

    Thread[] threads = new Thread[threadsNumber]; 

    for (int ii = 0; ii < threadsNumber; ii++) { 
     final int i = ii; 

     threads[ii] = new Thread(() -> { 
      //count your variable according the volume 
      // for example you can do so 
      int chunkStart = i * chunkSize; 
      int chunkEnd = chunkStart + chunkSize; 
      for(int j = chunkStart; j < chunkEnd; j++) { 
       //object creation with necessary proprs 
       //offer to queue here 
      } 
     }); 

     threads[ii].start(); 
    } 

    //your code here 
    //take here 

    for (int ii = 0; ii < threadsNumber; ii++) { 
     try { 
     //this part is only as example 
     //you do not need it     
     //here if you want you can also w8 for completion of all threads 
      threads[ii].join(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

ここで、消費に関する第2の問題について説明します。この子は、たとえばConcurrentLinkedBlockingQueue(http://www.jgroups.org/javadoc/org/jgroups/util/ConcurrentLinkedBlockingQueue.html)を使用できます。プロデューサスレッドでオファーを行い、メインでテイクメソッドを使用します。

しかし、正直言って私はまだあなたの問題の理由を得ていませんでした。あなたは連続したパイプラインを作りたいのですか、それとも単なる計算ですか?

また、このコースを受講することをお勧めします:https://www.coursera.org/learn/parallel-programming-in-java/home/welcome。 これは問題を正確に解決し、さまざまなソリューションを提供するのに役立ちます。同時および分散コンピューティングコースもあります。

関連する問題