2017-10-18 17 views
4
私はそうのような構造化されたストリームを実装行かなければならなかった

...スパーク

myDataSet 
    .map(r => StatementWrapper.Transform(r)) 
    .writeStream 
    .foreach(MyWrapper.myWriter) 
    .start() 
    .awaitTermination() 

このすべてが動作しているようですが、MyWrapper.myWriterのスループットで探して恐ろしいです。これは、効果的にJDBCのシンクになろうとしてだが、それは次のようになります。

val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] { 

    var connection: Connection = _ 

    override def open(partitionId: Long, version: Long): Boolean = { 
    Try (connection = getRemoteConnection).isSuccess 
    } 

    override def process(row: Seq[String]) { 
    val statement = connection.createStatement() 
    try { 
     row.foreach(s => statement.execute(s)) 
    } catch { 
     case e: SQLSyntaxErrorException => println(e) 
     case e: SQLException => println(e) 
    } finally { 
     statement.closeOnCompletion() 
    } 
    } 

    override def close(errorOrNull: Throwable) { 
    connection.close() 
    } 
} 

だから私の質問をされて - 行ごとにインスタンス化新しいForeachWriterですか?したがって、open()およびclose()はデータセットのすべての行に対して呼び出されますか?

スループットを改善するためのより良い設計がありますか?

SQL文を一度解析して何度も実行するには、データベース接続を開いたままにしますか?

+0

更新:あなたは、データがどのように扱われるかをより詳細に制御したい場合

は、あなたはバッチIDおよび基礎となるDataFrameを与えるSink形質を実装することができます。各トランザクションの接続を閉じたり開いたりしていないようです。 – Exie

+0

更新 - 私はstatement.closeOnCompletion()を単にstatement.close()に変更しようとしましたが、改善は見られませんでした。 – Exie

答えて

2

基本的なシンクの開閉は、ForeachWriterに依存します。

ForeachWriterを呼び出し、関連するクラスがForeachSinkであり、これはあなたのライターを呼び出すコードです:作家の

data.queryExecution.toRdd.foreachPartition { iter => 
    if (writer.open(TaskContext.getPartitionId(), batchId)) { 
    try { 
     while (iter.hasNext) { 
     writer.process(encoder.fromRow(iter.next())) 
     } 
    } catch { 
     case e: Throwable => 
     writer.close(e) 
     throw e 
    } 
    writer.close(null) 
    } else { 
    writer.close(null) 
    } 
} 

開閉は、あなたのソースから生成されforeachのバッチを実行しようとしました。 opencloseを文字通り開いてシンクドライバを閉じるようにしたい場合は、実装してシンクドライバを閉じる必要があります。私はいくつかのログを追加 -

trait Sink { 
    def addBatch(batchId: Long, data: DataFrame): Unit 
}