Springバッチのローカルパーティショニングステップを使用してEmployeeテーブル10レコードをNewEmployeeテーブルに移動するPOCプロジェクトを作成しました。このバッチプロセスを実行するために4つのスレッドを構成しました。 このバッチ処理を実行すると、pagingItemReader()メソッドがスレーブによって呼び出されないことがわかりました。このOraclePagingQueryProviderへの呼び出しは呼び出されません。 私が見逃した番号レコード(移動していない)は、構成されたスレッドの数に等しいことに気付きました。 リンク以下から指導を取って、このPOC Iを開発しました: - 私は通常の読み出し、処理および無マルチスレッドが関与していない書き込みロジックと、マスタとスレーブのコードを交換するときのコードの下に正常に動作しますので、予めご了承くださいhttps://github.com/mminella/LearningSpringBatch/tree/master/src/localPartitioningSpringバッチJDBCPagingItemReaderがslaveStepから呼び出されていません
を。
DB内のBATCH_STEP_EXECUTIONテーブルにも、8レコードしか移動しなかったことが示されています(ここでは2レコードが再度スレッド数に等しい)。次のようにDBレコードは言う: -
STEP_NAMEステータスCOMMIT_COUNT READ_COUNT WRITE_COUNT EXIT_CODE slaveStep:パーティション1はslaveStep を完了し1 4 4を完了した:partition0ではmasterStepが
のコードスニペットを終え2 8 8が完了 を完了し1 4 4を完了しましたConfigurationクラス
@Bean
public JobRegistryBeanPostProcessor jobRegistrar() throws Exception{
JobRegistryBeanPostProcessor registrar=new JobRegistryBeanPostProcessor();
registrar.setJobRegistry(this.jobRegistry);
registrar.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());
registrar.afterPropertiesSet();
return registrar;
}
@Bean
public JobOperator jobOperator() throws Exception{
SimpleJobOperator simpleJobOperator=new SimpleJobOperator();
simpleJobOperator.setJobLauncher(this.jobLauncher);
simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter());
simpleJobOperator.setJobRepository(this.jobRepository);
simpleJobOperator.setJobExplorer(this.jobExplorer);
simpleJobOperator.setJobRegistry(this.jobRegistry);
simpleJobOperator.afterPropertiesSet();
return simpleJobOperator;
}
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner partitioner = new ColumnRangePartitioner();
partitioner.setColumn("id");
partitioner.setDataSource(this.dataSource);
partitioner.setTable("Employee");
LOGGER.info("partitioner---->"+partitioner);
return partitioner;
}
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(gridSize)
.taskExecutor(taskExecutorConfiguration.taskExecutor())
.build();
}
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Employee, NewEmployee>chunk(chunkSize)
.reader(pagingItemReader(null,null))
.processor(employeeProcessor())
.writer(employeeWriter.customItemWriter())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("FR")
.start(masterStep())
.build();
}
@Bean
public ItemProcessor<Employee, NewEmployee> employeeProcessor() {
return new EmployeeProcessor();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
}
*/
@Bean
@StepScope
public JdbcPagingItemReader<Employee> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minvalue,
@Value("#{stepExecutionContext['maxValue']}") Long maxvalue) {
JdbcPagingItemReader<Employee> reader = new JdbcPagingItemReader<Employee>();
reader.setDataSource(this.dataSource);
// this should be equal to chunk size for the performance reasons.
reader.setFetchSize(chunkSize);
reader.setRowMapper((resultSet, i) -> {
return new Employee(resultSet.getLong("id"),
resultSet.getString("firstName"),
resultSet.getString("lastName"));
});
OraclePagingQueryProvider provider = new OraclePagingQueryProvider();
provider.setSelectClause("id, firstName, lastName");
provider.setFromClause("from Employee");
LOGGER.info("min-->"+minvalue);
LOGGER.info("max-->"+maxvalue);
provider.setWhereClause("where id<=" + minvalue + " and id > " + maxvalue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
provider.setSortKeys(sortKeys);
reader.setQueryProvider(provider);
LOGGER.info("reader--->"+reader);
return reader;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);
int targetSize = (max - min)/gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
LOGGER.info("Start-->" + start);
LOGGER.info("end-->" + end);
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
ColumnRangePartitionerクラスのコードスニペット: -
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);
int targetSize = (max - min)/gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
LOGGER.info("Start-->" + start);
LOGGER.info("end-->" + end);
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
をスレーブステップを呼び出し、ここ
をpartitionHandlerコードを追加します。あなたは、 'Partitioner'パーティションを返していることを確認したことがありますか?ワーカー・ステップ実行レコードはジョブ・リポジトリーに入っていますか? –
こんにちはMichael-返信ありがとうございます。 BATCH_STEP_EXECUTIONテーブルレコードで質問を更新しました。ここでは、読んだ読者がソーステーブルから第2列のデータを読み取っていない間に、最初の列の値を次の値にコピーしています。 2番目の列と同じものが宛先テーブルに保存されています。パーティションロジックを削除した場合、これらの問題は発生しません。ご協力いただきありがとうございます。 – Abhilash