1
私はDataset
の行のバッチをSparkで作成しようとしています。 サービスに送信されるレコードの数を維持するために、データを送信するレートを維持できるようにアイテムをバッチします。 のために、私は入力Dataset[Person]
は100を持っている場合たとえばDataset[PersonBatch]
データセットスパークスケーラの一括処理
を作成したい与えDataset[Person]
については
case class Person(name:String, address: String)
case class PersonBatch(personBatch: List[Person])
は、すべてのPersonBatch
がn
レコードのリストであるべきところDataset[PersonBatch]
ようにする必要があり、出力Dataset
を記録(人) 。
私はこれを試しましたが、うまくいきませんでした。
object DataBatcher extends Logger {
var batchList: ListBuffer[PersonBatch] = ListBuffer[PersonBatch]()
var batchSize: Long = 500 //default batch size
def addToBatchList(batch: PersonBatch): Unit = {
batchList += batch
}
def clearBatchList(): Unit = {
batchList.clear()
}
def createBatches(ds: Dataset[Person]): Dataset[PersonBatch] = {
val dsCount = ds.count()
logger.info(s"Count of dataset passed for creating batches : ${dsCount}")
val batchElement = ListBuffer[Person]()
val batch = PersonBatch(batchElement)
ds.foreach(x => {
batch.personBatch += x
if(batch.personBatch.length == batchSize) {
addToBatchList(batch)
batch.requestBatch.clear()
}
})
if(batch.personBatch.length > 0) {
addToBatchList(batch)
batch.personBatch.clear()
}
sparkSession.createDataset(batchList)
}
}
このジョブをHadoopクラスタで実行します。 これで助けてもらえますか?
これは役に立ちましたか? –
質問があればお気軽にお問い合わせください –