Spring Integrationを使用してOracleデータベースに特定のステータスのレコードを照会する、小さなSpring起動アプリケーションがあります。JpaPollingChannelAdapterには、JdbcPollingChannelAdapterのUpdateSqlと同様の機能がありますか?
最初にJdbcPollingChannelAdapterを使用して作業POCを作成しました。これは、見つかったレコードのステータスを変更して再スキャンされないようにする更新ステートメントも定義しました。
このアプリケーションではHibernateを使用しているため、Jdbcの実装をJPAのアプローチに置き換えたいと考えていました。
このように、JpaExecutorを使用してJpaPollingChannelAdapterを実装し、結果をエンティティ形式で取得することに成功しました。
私が達成しようとしているのは、ポーリングと同じトランザクションで見つかったすべてのレコードを更新するJdbcのアプローチと同様の動作です。
アダプター内でこれを達成する適切な方法はありますか、または単にメッセージハンドラー内でエンティティdaoを使用する必要がありますか?
うまくいけばうまくいきます。
更新:
私は(それが唯一の検索のためだからInboundChannelAdapterに対して)私はOutboundChannelAdapterまたはゲートウェイを使用する必要がありますと仮定していthrough the docsを探しています。
私は、1つのハンドラ内ですべてを行うことができれば、または複数のチャネルを定義する必要がある場合は、適切に配線する方法になります。取得するには1つを選択し、エンティティのステータスを更新します。 JpaExecutor
はdeleteAfterPoll
オプションを持っているJpaPollingChannelAdapter
機能については
@Configuration
public class IntegrationConfiguration {
private final Log log = LogFactory.getLog(getClass());
@Autowired
private DataSource dataSource;
@Autowired
private EntityManager entityManager;
@Autowired
private Reactor rootReactor;
@Autowired
private RunDao runDao;
@Bean
@InboundChannelAdapter(channel = "notificationChannel", poller = @Poller(fixedDelay = "60000", maxMessagesPerPoll = "-1"))
public MessageSource<?> jpaMessageSource() {
return new JpaPollingChannelAdapter(jpaSelectExecutor());
}
@Bean
@Gateway(requestChannel = "notificationChannel")
public void updateMessageStatus() {
JpaOutboundGateway gateway = new JpaOutboundGateway(jpaUpdateExecutor());
gateway.setGatewayType(OutboundGatewayType.UPDATING);
}
@Bean
public JpaExecutor jpaSelectExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManager);
executor.setJpaQuery("select R from Run R where R.notificationStatus = 'NOT_SENT' and R.runStatus.status = 'COMPLETE' and R.runConfig.notificationRecipients is not null");
executor.setEntityClass(Run.class);
return executor;
}
@Bean
public JpaExecutor jpaUpdateExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManager);
executor.setJpaQuery("update Run R set R.notificationStatus = 'SENDING' where R.RunId = :RunId");
executor.setEntityClass(Run.class);
return executor;
}
@Bean
@ServiceActivator(inputChannel = "notificationChannel")
public MessageHandler jpaMessageHandler() {
MessageHandler handler = new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
for (Run run : (ArrayList<Run>) message.getPayload()) {
rootReactor.notify("send-email-notification", Event.wrap(run));
//run.setNotificationStatus(Run.NotificationStatus.SENDING);
//runDao.merge(run);
}
}
};
return handler;
}
@Bean
public IntegrationFlow pollingAdapterFlow() {
return IntegrationFlows
.from(jpaMessageSource())
.handle(jpaMessageHandler())
.get();
}
}
ありがとうございましたArtem、私はfixedDelayを使ってJpaOutboundGatewayマージアプローチを調べます。 – rcurrie
ちょうどフォローアップの質問。私はJpaOutboundGatewayを使用する2番目の提案を実装しようとしています。上記のコードのJpaPollingChannelAdapterの定義をJpaOutboundGatewayアダプタに置き換える必要がありますか?私は、レコードを取得するための選択クエリと、エンティティのステータスを更新するステップの両方を定義する方法に苦労しています。 – rcurrie
いいえ、あなたはしないでください。 SELECTには 'JpaPollingChannelAdapter'を使い、UPDATEには' JpaOutboundGateway'を使います。それは明確ですか? –