2016-12-08 3 views
1

私は毎日特定の時間に実行する仕事があります。これには2つのステップがあり、それぞれにreader()とprocessor()の部分で休憩呼び出しが設定されています。リソースとは、MySQL DBに格納されているアカウント番号です。だから、バッチバッチジョブは正常に動作し、私たちは期待される出力を得ています。しかし、1つのスレッドでのみ実行されます。私はそれを並列化しようとしましたが、ドキュメンテーションといくつかの例を経て、しばらくしてこの特定のものを使用しました。example。ここにJavaの私の仕事の構成コードです。ステップは、Task Executorを使用して1つのスレッドでのみ実行されます。

@Bean 
public TaskExecutor taskExecutor() { 
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); 
    taskExecutor.setMaxPoolSize(60); 
    taskExecutor.afterPropertiesSet(); 
    return taskExecutor; 
} 

@Bean 
public Job processJob(BatchListener listener) { 
    return jobBuilderFactory.get("Job").incrementer(new RunIdIncrementer()).listener(listener) 
      .flow(processStep1()).on("*").to(Step2()).end().build(); 
} 
@Bean 
public Step processStep1() { 
    return (Step) stepBuilderFactory.get("Step1") 
      .<response, response>chunk(3). 
      reader(getItemReader()). 
      processor(getItemProcess()). 
      writer(getItemWriter()). 
      taskExecutor(taskExecutor()). 
      throttleLimit(2). 
      build(); 

} 


@Bean 
public Step processStep2() { 

    SimpleStepBuilder<AccountResponse,batch_details> process = stepBuilderFactory.get("processStep2") 
      .<AccountResponse,batch_details>chunk(5).reader(getBatchReader()).processor(getBatchProcessor()); 
    return process.writer(getBatchWriter()).build(); 

} 

この設定は、タスク実行プログラムが設定されていても、1つのスレッドでのみ実行されます。別のスレッドで実行するために、間違っていたり、行方不明になっていることを助けてくれますか?私はstep1とstep2を並列化したいと思います。データの並行性は問題ではありません。ステップ1を並列化すると、ステップ2を複製します。ありがとうございました。

出力例:私はチャンクサイズを変更した場合

Thread # 37 is doing this task 
Hibernate: Select * from batch_details where status != 'complete' and  session_id = '' and status != 'in_solve' ORDER BY RAND() LIMIT 3 
Hibernate: update batch_details set status = 'in_cs' where account_id= ? 
Hibernate: update batch_details set session_completion_time=?, session_id=?,  status=? where account_id=? 
accountnumber1 

Thread # 37 is doing this task 
Hibernate: Select * from batch_details where status != 'complete' and session_id = '' and status != 'in_solve' ORDER BY RAND() LIMIT 3 
Hibernate: update batch_details set status = 'in_cs' where account_id= ? 
accountnumber2 

そして、別の質問では、読者は、チャンクサイズの回繰り返すが、同じスレッドでされています。なぜこれが起こっているのか説明することができれば、この段階でこれが何を意味するのか理解できません。

答えて

1

ジョブには並列フローが組み込まれていません。

// helper method to create a split flow out of a List of steps 
private static Flow createParallelFlow(List<Step> steps) { 
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); 
    // -1 indicates no concurrency limit at all, steps.size is in this case 2 threads, 1 means just 1 thread. 
    taskExecutor.setConcurrencyLimit(steps.size()); 

    List<Flow> flows = steps.stream() // we have to convert the steps to a flows 
     .map(step -> // 
       new FlowBuilder<Flow>("flow_" + step.getName()) // 
       .start(step) // 
       .build()) // 
      .collect(Collectors.toList()); 

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) // 
     .add(flows.toArray(new Flow[flows.size()])) // 
     .build(); 
} 

あなたの仕事を:現在、それだけでthis questionでHansjoerg Wingeierは、ヘルパー・メソッドのセットと並行してステップを実行するための良い方法を与えた順次ステップ1とステップ1.

の完了時にステップ2を実行します

@Bean 
public Job myJob() { 

    List<Step> steps = new ArrayList<>(); 
    steps.add(processStep1); 
    steps.add(processStep2); 

    return jobBuilderFactory.get("yourJobName")    
     .start(createParallelFlow(steps));     
     .end() 
     .build(); 
    } 
+0

@Sander_MThanksのようになります。 – arbit

関連する問題