1

私はDatasetの行のバッチをSparkで作成しようとしています。 サービスに送信されるレコードの数を維持するために、データを送信するレートを維持できるようにアイテムをバッチします。 のために、私は入力Dataset[Person]は100を持っている場合たとえばDataset[PersonBatch]データセットスパークスケーラの一括処理

を作成したい与えDataset[Person]については

case class Person(name:String, address: String) 
case class PersonBatch(personBatch: List[Person]) 

は、すべてのPersonBatchnレコードのリストであるべきところDataset[PersonBatch]ようにする必要があり、出力Datasetを記録(人) 。

私はこれを試しましたが、うまくいきませんでした。

object DataBatcher extends Logger { 

    var batchList: ListBuffer[PersonBatch] = ListBuffer[PersonBatch]() 
    var batchSize: Long = 500 //default batch size 

    def addToBatchList(batch: PersonBatch): Unit = { 
    batchList += batch 
    } 

    def clearBatchList(): Unit = { 
    batchList.clear() 
    } 

    def createBatches(ds: Dataset[Person]): Dataset[PersonBatch] = { 

    val dsCount = ds.count() 
    logger.info(s"Count of dataset passed for creating batches : ${dsCount}") 
    val batchElement = ListBuffer[Person]() 
    val batch = PersonBatch(batchElement) 
    ds.foreach(x => { 
     batch.personBatch += x 
     if(batch.personBatch.length == batchSize) { 
     addToBatchList(batch) 
     batch.requestBatch.clear() 
     } 
    }) 
    if(batch.personBatch.length > 0) { 
     addToBatchList(batch) 
     batch.personBatch.clear() 
    } 
    sparkSession.createDataset(batchList) 
    } 
} 

このジョブをHadoopクラスタで実行します。 これで助けてもらえますか?

答えて

1

rdd.iteratorはグループ化された機能が役立ちます。例えば

がデータベースに挿入しようとし

ここiter.groupedでバッチ挿入を行うサンプルコードスニペット(バッチサイズ)の1000とイムを(バッチサイズ)をiter.grouped

df.repartition(numofpartitionsyouwant) // numPartitions ~ number of simultaneous DB connections you can planning to give... 
def insertToTable(sqlDatabaseConnectionString: String, 
        sqlTableName: String): Unit = { 

    val tableHeader: String = dataFrame.columns.mkString(",") 
    dataFrame.foreachPartition { partition => 
    //NOTE : EACH PARTITION ONE CONNECTION (more better way is to use connection pools) 
    val sqlExecutorConnection: Connection = 
     DriverManager.getConnection(sqlDatabaseConnectionString) 
    //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql 
    partition.grouped(1000).foreach { group => 
     val insertString: scala.collection.mutable.StringBuilder = 
     new scala.collection.mutable.StringBuilder() 

     group.foreach { record => 
     insertString.append("('" + record.mkString(",") + "'),") 
     } 

     sqlExecutorConnection 
     .createStatement() 
     .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " 
      + insertString.stripSuffix(",")) 
    } 

    sqlExecutorConnection.close() // close the connection so that connections wont exhaust. 
    } 
} 
+0

これは役に立ちましたか? –

+0

質問があればお気軽にお問い合わせください –

関連する問題