私は次のステップでデータベースに書き込むために取り出したJMSキューにメッセージを書き込もうとしています。最初の部分は2番目の非同期で同期する必要があります。 JMS部分は実際には遅いです(1分でキューに1100項目)。ActiveMQへのスプリングバッチ書き込み
これは私の仕事の様子です。
@Bean
public Job multiThreadedStepJob() {
Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end();
Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end();
Flow splitFlow = new FlowBuilder<Flow>("splitflow")
.split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build();
return jobBuilders.get("multiThreadedStepJob")
.start(splitFlow).end().build();
}
最初のステップ:
@Bean
public Step step() {
return stepBuilders.get("step")
.<OrderDTO, OrderDTO>chunk(CHUNK_SIZE)
.reader(reader())
.writer(writer())
.build();
}
第二段階:私は実行することができますので、私のエラーは、ステップの作家とSTEP2の読者の内側にあると思い
@Bean
public Step step2() {
return stepBuilders.get("step2")
.<OrderDTO, OrderDTO>chunk(100)
.reader(reader2())
.writer(writer2())
.build();
}
他の読者とライターは一緒にいて問題はありません。
@Bean
public JmsItemWriter<OrderDTO> writer() {
JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>();
itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
return itemWriter;
}
@Bean
public JmsItemReader<OrderDTO> reader2() {
JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>();
itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
itemReader.setItemType(OrderDTO.class);
return itemReader;
}
彼らは、キューに接続するための同じJmsTemplateを使用します。
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestination(queue());
jmsTemplate.setReceiveTimeout(500);
return jmsTemplate;
}
@Bean
public Queue queue() {
return new ActiveMQQueue("orderList");
}
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
factory.setTrustAllPackages(true);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(30);
factory.setPrefetchPolicy(prefetchPolicy);
PooledConnectionFactory pool = new PooledConnectionFactory(factory);
pool.setMaxConnections(10);
pool.setMaximumActiveSessionPerConnection(10);
pool.isCreateConnectionOnStartup();
return pool;
}
私が使用した構成の残りの部分は@EnableBatchProcessingから構成です。なぜこれが遅いのか誰にも分かりますか?