2017-08-17 8 views
0

Springバッチのローカルパーティショニングステップを使用してEmployeeテーブル10レコードをNewEmployeeテーブルに移動するPOCプロジェクトを作成しました。このバッチプロセスを実行するために4つのスレッドを構成しました。 このバッチ処理を実行すると、pagingItemReader()メソッドがスレーブによって呼び出されないことがわかりました。このOraclePagingQueryProviderへの呼び出しは呼び出されません。 私が見逃した番号レコード(移動していない)は、構成されたスレッドの数に等しいことに気付きました。 リンク以下から指導を取って、このPOC Iを開発しました: - 私は通常の読み出し、処理および無マルチスレッドが関与していない書き込みロジックと、マスタとスレーブのコードを交換するときのコードの下に正常に動作しますので、予めご了承くださいhttps://github.com/mminella/LearningSpringBatch/tree/master/src/localPartitioningSpringバッチJDBCPagingItemReaderがslaveStepから呼び出されていません

を。

DB内のBATCH_STEP_EXECUTIONテーブルにも、8レコードしか移動しなかったことが示されています(ここでは2レコードが再度スレッド数に等しい)。次のようにDBレコードは言う: -

STEP_NAMEステータスCOMMIT_COUNT READ_COUNT WRITE_COUNT EXIT_CODE slaveStep:パーティション1はslaveStep を完了し1 4 4を完了した:partition0ではmasterStepが

のコードスニペットを終え2 8 8が完了 を完了し1 4 4を完了しましたConfigurationクラス

  @Bean 
       public JobRegistryBeanPostProcessor jobRegistrar() throws Exception{ 
        JobRegistryBeanPostProcessor registrar=new JobRegistryBeanPostProcessor(); 
        registrar.setJobRegistry(this.jobRegistry); 
        registrar.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory()); 
        registrar.afterPropertiesSet(); 
        return registrar; 
       } 

       @Bean 
       public JobOperator jobOperator() throws Exception{ 
        SimpleJobOperator simpleJobOperator=new SimpleJobOperator(); 
        simpleJobOperator.setJobLauncher(this.jobLauncher); 
        simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter()); 
        simpleJobOperator.setJobRepository(this.jobRepository); 
        simpleJobOperator.setJobExplorer(this.jobExplorer); 
        simpleJobOperator.setJobRegistry(this.jobRegistry); 

        simpleJobOperator.afterPropertiesSet(); 
        return simpleJobOperator; 

       } 

       @Bean 
       public ColumnRangePartitioner partitioner() { 
        ColumnRangePartitioner partitioner = new ColumnRangePartitioner(); 
        partitioner.setColumn("id"); 
        partitioner.setDataSource(this.dataSource); 
        partitioner.setTable("Employee"); 
        LOGGER.info("partitioner---->"+partitioner); 
        return partitioner; 
       } 

       @Bean 
       public Step masterStep() { 
        return stepBuilderFactory.get("masterStep") 
          .partitioner(slaveStep().getName(), partitioner()) 
          .step(slaveStep()) 
          .gridSize(gridSize) 
          .taskExecutor(taskExecutorConfiguration.taskExecutor()) 
          .build(); 
       } 

       @Bean 
       public Step slaveStep() { 
        return stepBuilderFactory.get("slaveStep") 
          .<Employee, NewEmployee>chunk(chunkSize) 
          .reader(pagingItemReader(null,null)) 
          .processor(employeeProcessor()) 
          .writer(employeeWriter.customItemWriter()) 
          .build(); 
       } 

       @Bean 
       public Job job() { 
        return jobBuilderFactory.get("FR") 
          .start(masterStep()) 
          .build(); 
       } 

       @Bean 
       public ItemProcessor<Employee, NewEmployee> employeeProcessor() { 
        return new EmployeeProcessor(); 
       } 

       @Override 
       public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { 
        this.applicationContext=applicationContext; 
       } 


       */ 

       @Bean 
       @StepScope 
       public JdbcPagingItemReader<Employee> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minvalue, 
         @Value("#{stepExecutionContext['maxValue']}") Long maxvalue) { 

        JdbcPagingItemReader<Employee> reader = new JdbcPagingItemReader<Employee>(); 
        reader.setDataSource(this.dataSource); 
        // this should be equal to chunk size for the performance reasons. 
        reader.setFetchSize(chunkSize); 
        reader.setRowMapper((resultSet, i) -> { 
         return new Employee(resultSet.getLong("id"), 
           resultSet.getString("firstName"), 
           resultSet.getString("lastName")); 
        }); 

        OraclePagingQueryProvider provider = new OraclePagingQueryProvider(); 
        provider.setSelectClause("id, firstName, lastName"); 
        provider.setFromClause("from Employee"); 
        LOGGER.info("min-->"+minvalue); 
        LOGGER.info("max-->"+maxvalue); 
        provider.setWhereClause("where id<=" + minvalue + " and id > " + maxvalue); 

        Map<String, Order> sortKeys = new HashMap<>(1); 
        sortKeys.put("id", Order.ASCENDING); 
        provider.setSortKeys(sortKeys); 

        reader.setQueryProvider(provider); 
        LOGGER.info("reader--->"+reader); 
        return reader; 
       } 

     @Override 
     public Map<String, ExecutionContext> partition(int gridSize) { 
      int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class); 
      int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); 
      int targetSize = (max - min)/gridSize + 1; 

      Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 
      int number = 0; 
      int start = min; 
      int end = start + targetSize - 1; 

      while (start <= max) { 
       ExecutionContext value = new ExecutionContext(); 
       result.put("partition" + number, value); 

       if (end >= max) { 
        end = max; 
       } 
       LOGGER.info("Start-->" + start); 
       LOGGER.info("end-->" + end); 
       value.putInt("minValue", start); 
       value.putInt("maxValue", end); 
       start += targetSize; 
       end += targetSize; 
       number++; 
      } 

      return result; 
     } 

ColumnRangePartitionerクラスのコードスニペット: -

int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class); 
    int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); 
    int targetSize = (max - min)/gridSize + 1; 

    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 
    int number = 0; 
    int start = min; 
    int end = start + targetSize - 1; 

    while (start <= max) { 
     ExecutionContext value = new ExecutionContext(); 
     result.put("partition" + number, value); 

     if (end >= max) { 
      end = max; 
     } 
     LOGGER.info("Start-->" + start); 
     LOGGER.info("end-->" + end); 
     value.putInt("minValue", start); 
     value.putInt("maxValue", end); 
     start += targetSize; 
     end += targetSize; 
     number++; 
    } 

    return result; 
+0

をスレーブステップを呼び出し、ここ

 stepBuilderFactory .get("userMasterStep") .partitioner(userSlaveStep().getName(), userPartitioner()) .partitionHandler(userMasterSlaveHandler()) .build(); 

をpartitionHandlerコードを追加します。あなたは、 'Partitioner'パーティションを返していることを確認したことがありますか?ワーカー・ステップ実行レコードはジョブ・リポジトリーに入っていますか? –

+0

こんにちはMichael-返信ありがとうございます。 BATCH_STEP_EXECUTIONテーブルレコードで質問を更新しました。ここでは、読んだ読者がソーステーブルから第2列のデータを読み取っていない間に、最初の列の値を次の値にコピーしています。 2番目の列と同じものが宛先テーブルに保存されています。パーティションロジックを削除した場合、これらの問題は発生しません。ご協力いただきありがとうございます。 – Abhilash

答えて

0

この問題の解決策が見つかりました。 partitionerの後にmasterStepにpartitionHandlerを追加する必要があります。 partitionHandlerでは、slaveStepとその他​​の設定を定義します。以下はコードスニペットです。

MasterStep: - partitionHandlerという名前の別のBeanを定義し、ここで

@Bean 
public PartitionHandler userMasterSlaveHandler() throws Exception { 
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); 
    handler.setGridSize(gridSize); 
    handler.setTaskExecutor(taskExecutorConfiguration.taskExecutor()); 
    handler.setStep(userSlaveStep()); 
    handler.afterPropertiesSet(); 
    return handler; 
} 
関連する問題