ファイルを読み込んでコンテンツをStringに変換するケースがあります。その後、文字列を複数のペイロードに分割し、それらのペイロードを個別にキューに送信します。 JmsTransactionManagerを使用して、すべてのメッセージを送信するか、まったく送信しないようにします。JmsTransactionManagerと組み合わせたTransactionSynchronizationFactoryが機能しない
TXが成功したら、ファイルをアーカイブフォルダに移動します。それ以外の場合は、失敗したフォルダに移動します。私はこれを達成するためにtransactionSynchronizationFactoryを使用できることを読んだ。しかし、JmsTransactionManagerと組み合わせて、ファイルは移動されません。 PseudoTransactionManagerを使用すると、ファイルは移動しますが、私はJmsTransactionを失います。
私は問題を再現するために簡略化したバージョンを作成しました。 (この場合のファイルの内容が値の簡単なカンマ区切りリストです)
@Bean
public IntegrationFlow fileInboundAdaptor() {
return IntegrationFlows
.from(s -> s.file(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(5000)
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(new JmsTransactionManager(connectionFactory))
)
)
.transform(Transformers.fileToString())
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.handle((GenericHandler<String>) (payload, headers) -> {
jmsTemplate.send("SOME_QUEUE", (Session session) -> session.createTextMessage(payload));
return payload;
})
.channel(MessageChannels.queue("fileReadingResultChannel"))
.get();
}
transactionSynchronizationFactoryは次のようになります。
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionParser parser = new SpelExpressionParser();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor
= new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
syncProcessor.setAfterCommitExpression(parser.parseExpression(
"payload.renameTo(new java.io.File('test/archive' " +
" + T(java.io.File).separator + 'ARCHIVE-' + payload.name))"));
syncProcessor.setAfterRollbackExpression(parser.parseExpression(
"payload.renameTo(new java.io.File('test/fail' " +
" + T(java.io.File).separator + 'FAILED-' + payload.name))"));
return new DefaultTransactionSynchronizationFactory(syncProcessor);
}
だから私の質問は次のとおりです。TransactionSynchronizationFactoryはPseudoTransactionManagerで動作しませんまたはJmsTransactionManagerでも動作するはずですか?私はJmsTransactionにtransactionSynchronizationを設定するために必要な
ソリューション
。このような何か:
public JmsTransactionManager transactionManager() {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory);
jmsTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return jmsTransactionManager;
}
にそれをオンにする必要があります'@ Bean'。その '.transactional()'オプションからApplicationContextに登録されていません –
これは問題ではありませんでした。私は私の元のポストで解決策を追加しました。 –