2016-09-13 10 views
1

私はdStream.foreachRDDメソッドの中に処理ブロックを持っており、その処理にはspark sqlを使ってmysqlに永続化することが含まれます。投稿、私は別のスキーマ/テーブルで最新の処理オフセットを維持しています。ブロック全体をトランザクション(スカラ)にしたい。それを達成する方法? 以下は、コードからの抜粋です。トランザクションブロック| Spark SQL、rdd

.foreachRDD(rdd => {{... ................. ..................

df.write.mode( "append").jdbc(url + rawstore_schema + "?rewriteBatchedStatements = true"、tablesToFetch(index) 、connectionProperties)

................ metricsStatement.executeUpdate( "metrics.txn_offsets(topic、part、off、date_updated)に値を挿入します。 ..........................

}

両方の書き込み操作(処理済みデータとオフセットデータ)は2つの異なるデータベース/接続で行われるため、それらをトランザクションにする方法はありますか?

ありがとうございました

答えて

1

私は同じ質問がありました。スパークコード(v2.1まで)を見ると、トランザクション管理を指定するオプションはありません。

詳細はこちら:https://stackoverflow.com/a/42964361/47551

関連する問題