2017-09-21 13 views
2

tryParquet関数は、ParquetファイルからDatasetを読み込もうとします。そうでない場合、それは計算し、持続し、提供されたデータセット計画戻っ:Spark Dataset [T]コンストラクタとしての一般的なT

import scala.util.{Try, Success, Failure} 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.Dataset 

sealed trait CustomRow 

case class MyRow(
    id: Int, 
    name: String 
) extends CustomRow 

val ds: Dataset[MyRow] = 
    Seq((1, "foo"), 
     (2, "bar"), 
     (3, "baz")).toDF("id", "name").as[MyRow] 

def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] = 
    Try(session.read.parquet(path)) match { 
     case Success(df) => df.as[T] // <---- compile error here 
     case Failure(_) => { 
     target.write.parquet(path) 
     target 
     } 
    } 

val readyDS: Dataset[MyRow] = 
    tryParquet(spark, "/path/to/file.parq", ds) 

しかしこれはdf.as[T]にコンパイルエラーを生成した:

データセットに格納されているタイプのエンコーダを見つけることができません。

他の型をシリアル化するためのサポートは、将来のリリースで追加される予定です。

場合成功(DF)=> df.as [T]

一つは、型なしDataFrameを返し、所望のコンストラクタに発信者キャストせるtryParquetキャストdfすることによってこの問題を回避することができます。しかし、型が内部的に関数によって管理されるようにするための解決策はありますか?

答えて

5

は、typeパラメータでEncoderを使用することにより可能です以下のようになります。

import org.apache.spark.sql.Encoder 

def tryParquet[T <: CustomRow: Encoder](...) 

この方法では、コンパイラはオブジェクトを構築するときdf.as[T]はエンコーダを提供していることを証明することができます。

関連する問題