2016-10-05 5 views
0

私はSpringのバッチジョブを分割ステップで構成し、分割ステップはチャンクで処理しています。SpringバッチのWriterからRunnableを起動しました。分割されたステップ

さらに、新しいスレッド(Runnableの実装)をメソッドpublic void write(List<? extends VO> itemsToWrite)から起動できますか?

は基本的に、作家は、ここでのLuceneを使用してインデックスを書き込み、書き込みがListchunk-sizeの項目を有しているので、私はListそのセグメントに分割し、新しいRunnableに各セグメントを通過すると考え。

これは良いアプローチですか?

私はサンプルをコーディングしましたが、ほとんどの場合は動作しますが、何度か立ち往生してしまいます。

私が心配する必要があることはありますか?またはこれを達成するために春のバッチに何かが組み込まれていますか?

チャンク全体に対して1つのスレッドで書き込みを行うことは望ましくありません。私はさらにチャンクを分割したいと思う。

のLucene IndexWriterは、スレッドセーフであるとアプローチはhere

サンプルコードをリストされている - ライターは、私は、スレッドプールからスレッドを開いている項目のListを取得しますか?私は、チャンクのために終了するプール、

@Override 
    public void write(List<? extends IndexerInputVO> inputItems) throws Exception { 


     int docsPerThread = Constants.NUMBER_OF_DOCS_PER_INDEX_WRITER_THREADS; 
     int docSize = inputItems.size(); 
     int remainder = docSize%docsPerThread; 
     int poolSize = docSize/docsPerThread; 

     ExecutorService executor = Executors.newFixedThreadPool(poolSize+1); 


     int fromIndex=0; 
     int toIndex = docsPerThread; 

     if(docSize < docsPerThread){ 
      executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems)); 
     }else{ 
      for(int i=1;i<=poolSize;i++){ 
       executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex))); 
       fromIndex+=docsPerThread; 
       toIndex+=docsPerThread; 
      } 

      if(remainder != 0){ 
       toIndex=docSize; 
       executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex))); 
      } 
     } 

     executor.shutdown(); 

     while(executor.isTerminated()){ 
      ; 
     } 

答えて

0

を待つ場合でも、私はライターで新しいスレッドを起動すると、それは良いアイデアだとよく分からない任意の懸念があるでしょう。 これらのスレッドは春バッチフレームワークの対象外です。したがって、上記のシャットダウンポリシーとキャンセルポリシーを実装する必要があります。 1つのセグメントの処理に失敗すると、キュー全体が失敗する可能性があります。公式ドキュメントpassingDataToFutureSteps

で説明したように、私は作家から次のステップにリストのカスタムセグメントを促進するために提案することができます別のアプローチとして

関連する問題