2017-10-09 2 views
0

spark 1.6.1を使用しています。GroupedDatasetを寄木細工に保存するか、それをtoDFに変換する

GroupDatasetをパーケットファイルに保存するためのAPIはありますか? または、DataFrameに変換します。

など。私はカスタムオブジェクト 'プロシージャ'を持っています。私はDataframeをプロシージャオブジェクトに変換しました。 その後、私はpatientIDでグループをやっています。 寄せ木細工のファイルにグループ化したり、これをDataframeとして他の機能に渡したいと思っていました。 ストレージ用のAPIを取得していないか、Dataframeに変換していません。

val procedureDs: Dataset[Procedure] = joinDf.select("patientid", "patientprocedureid", "procedurecode").as[Procedure] 
val groupedDs:GroupedDataset[Long, Procedure] = procedureDs.groupBy{ x => x.patientid } 

mapGroups

val a = groupedDs.mapGroups{ case (k,vs) => { (k, vs.toSeq)}} 

を適用した後には、エラーの下に与える:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.....PatientDiagnosis 
- array element class: "com....PatientDiagnosis" 
- field (class: "scala.collection.Seq", name: "_2") 
- root class: "scala.Tuple2" 

私は、エラーに変更明示的なエンコーダに

val a = groupedDigDs.mapGroups((k,vs) => (k, vs.toSeq))(org.apache.spark.sql.Encoders.bean(classOf[(Long, Seq[com....PatientDiagnosis])])) 

を与えようと試みていた:

java.lang.UnsupportedOperationException: Cannot infer type for class scala.Tuple2 because it is not bean-compliant 

答えて

1
GroupedData同じ

(火花2.xでRelationalGroupedDataset)、GroupedDataset(スパーク2.xでKeyValueGroupedDataset)は、保存する前に集約されなければなりません。あなたの目標は、さらに別のgroupByKeyあなたはmapGroupsを使用することができるのであれば

val groupedDs: GroupedDataset[K, V] = ??? 
// ... { case (k, xs) => (k, xs.toSeq) } to preserve key as well 
groupedDs.mapGroups { case (_, xs) => xs.toSeq } 

し、その結果を書き込みます。

関連する問題