2017-09-18 5 views
2

バネバッチで作業しています。私は、(オブジェクトのリストの)分割ステップを持っているし、リーダーとライターとのスレーブステップ。バックスバッチを使用してステップをパラレルモードで実行する方法

processStepをパラレルモードで実行します。 したがって、各パーティションに対して特定のReader-Writerのインスタンスを作成したいと考えています。

現在、作成されたパーティションは、Reader-Writerの同じインスタンスを使用します。したがって、これらの操作はシリアルモードで実行されます。最初のパーティションを読み書きし、最初のパーティションが完了したときに次のパーティションに対して同じ操作を行います。

春のブート構成クラス:

@Configuration 
@Import({ DataSourceConfiguration.class}) 
public class BatchConfiguration { 

    private final static int COMMIT_INTERVAL = 1; 

    @Autowired 
    private JobBuilderFactory jobBuilderFactory; 

    @Autowired 
    private StepBuilderFactory stepBuilderFactory; 

    @Autowired 
    @Qualifier(value="mySqlDataSource") 
    private DataSource mySqlDataSource; 

    public static int GRID_SIZE = 3; 

    public static List<Pojo> myList; 

    @Bean 
    public Job myJob() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception { 

     return jobBuilderFactory.get("myJob") 
      .incrementer(new RunIdIncrementer()) 
      .start(partitioningStep()) 
      .build(); 
    } 

    @Bean(name="partitionner") 
    public MyPartitionner partitioner() { 

    return new MyPartitionner(); 
    } 

    @Bean 
    public SimpleAsyncTaskExecutor taskExecutor() { 

    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); 
    taskExecutor.setConcurrencyLimit(GRID_SIZE); 
    return taskExecutor; 
    } 

    @Bean 
    public Step partitioningStep() throws NonTransientResourceException, Exception { 

    return stepBuilderFactory.get("partitioningStep") 
       .partitioner("processStep", partitioner()) 
       .step(processStep()) 
       .taskExecutor(taskExecutor()) 
       .build(); 
    } 

    @Bean 
    public Step processStep() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception { 

    return stepBuilderFactory.get("processStep") 
      .<List<Pojo>, List<Pojo>> chunk(COMMIT_INTERVAL) 
      .reader(processReader()) 
      .writer(processWriter()) 
      .taskExecutor(taskExecutor()) 
      .build(); 
    } 

    @Bean 
    public ProcessReader processReader() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception { 

    return new ProcessReader(); 
    } 

    @Bean 
    public ProcessWriter processWriter() { 

    return new ProcessWriter(); 
    } 
} 

partitionnerクラス

public class MyPartitionner implements Partitioner{ 

@Autowired 
private IService service; 

@Override 
public Map<String, ExecutionContext> partition(int gridSize) { 

    // list of 300 object partitionned like bellow 
    ... 
    Map<String, ExecutionContext> partitionData = new HashMap<String, ExecutionContext>(); 

    ExecutionContext executionContext0 = new ExecutionContext(); 
    executionContext0.putString("from", Integer.toString(0)); 
    executionContext0.putString("to", Integer.toString(100)); 
    partitionData.put("Partition0", executionContext0); 

    ExecutionContext executionContext1 = new ExecutionContext(); 
    executionContext1.putString("from", Integer.toString(101)); 
    executionContext1.putString("to", Integer.toString(200)); 
    partitionData.put("Partition1", executionContext1); 

    ExecutionContext executionContext2 = new ExecutionContext(); 
    executionContext2.putString("from", Integer.toString(201)); 
    executionContext2.putString("to", Integer.toString(299)); 
    partitionData.put("Partition2", executionContext2); 

    return partitionData; 
} 
} 

リーダークラス

public class ProcessReader implements ItemReader<List<Pojo>>, ChunkListener { 

    @Autowired 
    private IService service; 

    private StepExecution stepExecution; 

    private static List<String> processedIntervals = new ArrayList<String>(); 

    @Override 
    public List<Pojo> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { 

     System.out.println("Instance reference: "+this.toString()); 

     if(stepExecution.getExecutionContext().containsKey("from") && stepExecution.getExecutionContext().containsKey("to")){ 

      Integer from = Integer.valueOf(stepExecution.getExecutionContext().get("from").toString()); 
      Integer to = Integer.valueOf(stepExecution.getExecutionContext().get("to").toString()); 

      if(from != null && to != null && !processedIntervals.contains(from + "" + to) && to < BatchConfiguration.myList.size()){ 
       processedIntervals.add(String.valueOf(from + "" + to)); 
       return BatchConfiguration.myList.subList(from, to); 
      } 
     } 

     return null; 
    } 

    @Override 
    public void beforeChunk(ChunkContext context) { 

     this.stepExecution = context.getStepContext().getStepExecution(); 
    } 

    @Override 
    public void afterChunk(ChunkContext context) { } 

    @Override 
    public void afterChunkError(ChunkContext context) { } 

    } 
    } 

ライタークラス

public class ProcessWriter implements ItemWriter<List<Pojo>>{ 

    private final static Logger LOGGER = LoggerFactory.getLogger(ProcessWriter.class); 

    @Autowired 
    private IService service; 

    @Override 
    public void write(List<? extends List<Pojo>> pojos) throws Exception { 

     if(!pojos.isEmpty()){ 
      for(Pojo item : pojos.get(0)){ 
       try { 
        service.remove(item.getId()); 
       } catch (Exception e) { 
        LOGGER.error("Error occured while removing the item [" + item.getId() + "]", e); 
       } 
      } 
     } 
    } 
} 

あなたは私のコードが間違っているものを私に教えてくださいことはできますか?私のリーダライタ豆宣言に@StepScopeを追加することで解決

答えて

0

@Configuration 
@Import({ DataSourceConfiguration.class}) 
public class BatchConfiguration { 

    ... 

    @Bean 
    @StepScope 
    public ProcessReader processReader() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception { 

     return new ProcessReader(); 
    } 

    @Bean 
    @StepScope 
    public ProcessWriter processWriter() { 

    return new ProcessWriter(); 
    } 

    ... 

} 

このところで、あなたは、私は、各パーティションのchunck(リーダライタ)の別のインスタンスを持っています。

関連する問題