Map [String、String]にはDataset Encoderはありませんが、実際に作成することはできません。
安全なものと安全なものの2つのバージョンがあります。あなたがやりたいことをするには、次の2つのバージョンがあります。効果的にあなたは、計算を行うためにRDDレベルに削減する必要があります:
case class OnFrame(df: DataFrame) {
import df.sparkSession.implicits._
/**
* If input columns don't match we'll fail at query evaluation.
*/
def unsafeRDDMap: RDD[Map[String, String]] = {
df.rdd.map(row => Map(row.getAs[String]("col1") -> row.getAs[String]("col2")))
}
/**
* Use Dataset-to-case-class mapping.
* If input columns don't match we'll fail before query evaluation.
*/
def safeRDDMap: RDD[Map[String, String]] = {
df
.select($"col1" as "key", $"col2" as "value")
.as[OnFrame.Entry]
.rdd
.map(_.toMap)
}
def unsafeMap(): Map[String, String] = {
unsafeRDDMap.reduce(_ ++ _)
}
def safeMap(): Map[String, String] = {
safeRDDMap.reduce(_ ++ _)
}
}
あなたの目標は、おそらく我々は、この一層効率的に単一のマップにすべてを収集することは潜在的なスパーク防止であるとして可能性が何であるかをより明確に提供する場合-pattern - データがドライバに収まることを意味します。
私はスパーク1.6を使用していますが、データセットのコンセプトは2.0を超えていますか? – Newbie
安全でないバージョンのみ。インポートは 'df.sqlContext.implicits._'に変更する必要があります。 –