2017-12-06 4 views
0

私はAlpakkaとそのJMSコネクタで遊んで、Oracle AQからデータをデキューします。私は、以下の非常に基本的な実装を、thisガイドに従って実行することができます。Alpakka JMSトランザクション

私はそれをトランザクションにすることができますので、例外がスローされるとメッセージが失われないことを保証できます。

object ConsumerApp extends App { 
    implicit val system: ActorSystem = ActorSystem("actor-system") 
    implicit val materializer: ActorMaterializer = ActorMaterializer() 

    val connectionFactory = AQjmsFactory.getConnectionFactory(getOracleDataSource()) 

    val out = JmsSource.textSource(
     JmsSourceSettings(connectionFactory).withQueue("My_Queue") 
    ) 

    val sink = Sink.foreach { message: String => 
     println("in sink: " + message) 
     throw new Exception("") // !!! MESSAGE IS LOST !!! 
    } 

    out.runWith(sink, materializer) 
} 

それはPL/SQLだった場合、解決策は次のようになります:ストリームのステージが失敗したときに

DECLARE 
    dequeue_options   DBMS_AQ.DEQUEUE_OPTIONS_T; 
    message_properties   DBMS_AQ.MESSAGE_PROPERTIES_T; 
    message_handle    RAW (44); 
    msg      SYS.AQ$_JMS_TEXT_MESSAGE; 
BEGIN 
    DBMS_AQ.dequeue (
     queue_name   => 'My_Queue', 
     dequeue_options  => dequeue_options, 
     message_properties => message_properties, 
     payload    => msg, 
     msgid    => message_handle 
); 

    -- do something with the message 

    COMMIT; 
END; 

答えて

2

デフォルトの動作では、ストリーム全体をシャットダウンすることです。あなたはストリームにどのようにしてhandle errorsをしたいか決めなければなりません。 1つのアプローチは、例えば、バックオフ戦略を用いてストリームをrestartにすることです。

また、Alpakka JMSコネクタを使用しているため、acknowledgement modeClientAcknowledge(これはAlpakka 0.15以降で使用可能)に設定してください。この構成では、承認されなかったメッセージはJMSソースを介して再度配信されます。例:

val jmsSource: Source[Message, NotUsed] = JmsSource(
    JmsSourceSettings(connectionFactory) 
    .withQueue("My_Queue") 
    .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge) 
) 

val result = jmsSource 
    .map { 
    case textMessage: TextMessage => 
     val text = textMessage.getText 
     textMessage.acknowledge() 
     text 
    } 
    .runForeach(println) 
+0

これは、私が質問した数時間前にリリースされた新機能です。私はとても運がいいです:)。 – Feyyaz

+0

できるだけ早く試してみよう – Feyyaz

+0

今すぐ試してみてはいかがですか?パブリックリポジトリにはまだありません。 – Feyyaz

関連する問題