2016-06-19 2 views
0

私は春バッチチャンクベースのジョブを(カスタムリーダー、カスタムプロセッサー、カスタムライターにする必要があります。私が10個のスレッドを開始する場合、それらは並行して読み込み、処理、書込みの順番で実行する必要があります。Springバッチ:チャンク処理をマルチスレッド化するには

スレッド1 - >ためには:読み取り、プロセスは、

スレッド2書き込み - >ためには:読み取り、プロセス、書く...など

この動作を実装する方法を。 現在、以下のようにシングルスレッドモデルで動作するコードがあります。 (私は思う。わからない)。 私はバッチでコミット間隔をしたい:チャンクは '1'のみ(私はファイルを1行ずつ読む必要があるため)。

<!-- Reader bean for our simple Text file --> 
<bean id="productReader" class="com.mycompany.batching.reader.productItemReader" 
    scope="step"> 
    <property name="userId" value="#{jobParameters['userId']}" /> 
    <property name="dataId" value="#{jobParameters['dataId']}" /> 
</bean> 


<!-- Processor for the data red from text file --> 
<bean id="productProcessor" class="com.mycompany.batching.processor.productItemProcessor"> 
</bean> 

<!-- Persisting data to database --> 
<bean id="productWriter" class="com.mycompany.batching.writer.productItemWriter"> 
</bean> 

<batch:job id="simpleProductImportJob" xmlns="http://www.springframework.org/schema/batch"> 
    <batch:step id="importFileStep"> 
     <batch:tasklet> 
      <batch:chunk reader="productReader" processor="productProcessor" 
       writer="productWriter" commit-interval="1" /> 
     </batch:tasklet> 
    </batch:step> 
</batch:job> 

これをマルチスレッド化する方法?

+0

おそらく分割されたステップが必要です。http://docs.spring.io/spring-batch/reference/html/scalability.html –

答えて

0

これを達成する最善の方法はステップパーティショニングです。 ソースがファイル であると仮定します。1.ロジックに基づいてファイルを分割します。 2.パーティション作成者がファイルを選択し、各ファイルをスレーブに割り当てます。

バッチのcontext.xml

<batch:step id="fileProcessMaster"> 
     <batch:partition partitioner="filePartioner" step="readAndInsertToDb"> 
      <batch:handler task-executor="taskExecutor" /> 
     </batch:partition> 
    </batch:step> 

FilePartitioner.java

public class FilePartitioner implements Partitioner{ 

@Override 
public Map<String, ExecutionContext> partition(int gridSize) { 
    Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(); 
     int i = 0; 
     File[] files = new File(inboundDirectory).listFiles((FileFilter) new PrefixFileFilter(*); 
     for (File file : files) { 
      //Pick only those files which are splitted 
      if (file.getName().contains("Split")) { 
       ExecutionContext context = new ExecutionContext(); 
       context.putString("file", file.getAbsolutePath()); 
       map.put("slave" + i, context); 
       i++; 
      } 
     } 
     return map; 
} 

あなたはパーティのための豆の設定はスコープのステップであることを確認しています。