私はdStream.foreachRDDメソッドの中に処理ブロックを持っており、その処理にはspark sqlを使ってmysqlに永続化することが含まれます。投稿、私は別のスキーマ/テーブルで最新の処理オフセットを維持しています。ブロック全体をトランザクション(スカラ)にしたい。それを達成する方法? 以下は、コードからの抜粋です。トランザクションブロック| Spark SQL、rdd
.foreachRDD(rdd => {{... ................. ..................
df.write.mode( "append").jdbc(url + rawstore_schema + "?rewriteBatchedStatements = true"、tablesToFetch(index) 、connectionProperties)
................ metricsStatement.executeUpdate( "metrics.txn_offsets(topic、part、off、date_updated)に値を挿入します。 ..........................
}
両方の書き込み操作(処理済みデータとオフセットデータ)は2つの異なるデータベース/接続で行われるため、それらをトランザクションにする方法はありますか?
ありがとうございました