0
私はAlpakka
とそのJMS
コネクタで遊んで、Oracle AQ
からデータをデキューします。私は、以下の非常に基本的な実装を、thisガイドに従って実行することができます。Alpakka JMSトランザクション
私はそれをトランザクションにすることができますので、例外がスローされるとメッセージが失われないことを保証できます。
object ConsumerApp extends App {
implicit val system: ActorSystem = ActorSystem("actor-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val connectionFactory = AQjmsFactory.getConnectionFactory(getOracleDataSource())
val out = JmsSource.textSource(
JmsSourceSettings(connectionFactory).withQueue("My_Queue")
)
val sink = Sink.foreach { message: String =>
println("in sink: " + message)
throw new Exception("") // !!! MESSAGE IS LOST !!!
}
out.runWith(sink, materializer)
}
それはPL/SQL
だった場合、解決策は次のようになります:ストリームのステージが失敗したときに
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW (44);
msg SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
DBMS_AQ.dequeue (
queue_name => 'My_Queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle
);
-- do something with the message
COMMIT;
END;
これは、私が質問した数時間前にリリースされた新機能です。私はとても運がいいです:)。 – Feyyaz
できるだけ早く試してみよう – Feyyaz
今すぐ試してみてはいかがですか?パブリックリポジトリにはまだありません。 – Feyyaz