4
私はそうのような構造化されたストリームを実装行かなければならなかった
...スパーク
myDataSet
.map(r => StatementWrapper.Transform(r))
.writeStream
.foreach(MyWrapper.myWriter)
.start()
.awaitTermination()
このすべてが動作しているようですが、MyWrapper.myWriterのスループットで探して恐ろしいです。これは、効果的にJDBCのシンクになろうとしてだが、それは次のようになります。
val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {
var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
Try (connection = getRemoteConnection).isSuccess
}
override def process(row: Seq[String]) {
val statement = connection.createStatement()
try {
row.foreach(s => statement.execute(s))
} catch {
case e: SQLSyntaxErrorException => println(e)
case e: SQLException => println(e)
} finally {
statement.closeOnCompletion()
}
}
override def close(errorOrNull: Throwable) {
connection.close()
}
}
だから私の質問をされて - 行ごとにインスタンス化新しいForeachWriterですか?したがって、open()およびclose()はデータセットのすべての行に対して呼び出されますか?
スループットを改善するためのより良い設計がありますか?
SQL文を一度解析して何度も実行するには、データベース接続を開いたままにしますか?
更新:あなたは、データがどのように扱われるかをより詳細に制御したい場合
は、あなたはバッチIDおよび基礎となる
DataFrame
を与えるSink
形質を実装することができます。各トランザクションの接続を閉じたり開いたりしていないようです。 – Exie更新 - 私はstatement.closeOnCompletion()を単にstatement.close()に変更しようとしましたが、改善は見られませんでした。 – Exie