2017-03-14 5 views
1

私はScalaを初めて使用しましたが、何らかの練習をしながらエラーに遭遇しました。ScalaのRDD操作でエラーが発生しました

RDDをDataFrameに変換しようとしましたが、私のコードは次のとおりです。

package com.sclee.examples 

import com.sun.org.apache.xalan.internal.xsltc.compiler.util.IntType 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}; 


object App { 
    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("examples").setMaster("local") 
    val sc = new SparkContext(conf) 

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    case class Person(name: String, age: Long) 

    val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20))) 
    val df = personRDD.map({ 
     case Row(val1: String, val2: Long) => Person(val1,val2) 
    }).toDS() 

// val ds = personRDD.toDS() 
    } 
} 

私はSparkのマニュアルの指示に従い、rddをデータフレームに変換する方法を示すいくつかのブログを参照しましたが、私は以下のエラーを受け取りました。

Error:(20, 27) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases. 
    val df = personRDD.map({ 

私は自分で問題を解決しようとしましたが、失敗しました。どんな助けもありがとう。

答えて

1

次のコードは動作します(私はありませんアプリケーションのうち、代わりにSparkContextSqlContext

  • 移動Personクラスの

    • 使用SparkSession

      import org.apache.spark.rdd.RDD 
      import org.apache.spark.sql.SparkSession 
      
      case class Person(name: String, age: Long) 
      object SparkTest { 
          def main(args: Array[String]): Unit = { 
      
          // use the SparkSession of Spark 2 
          val spark = SparkSession 
           .builder() 
           .appName("Spark SQL basic example") 
           .config("spark.some.config.option", "some-value") 
           .getOrCreate() 
      
          import spark.implicits._ 
      
          // this your RDD - just a sample how to create an RDD 
          val personRDD: RDD[Person] = spark.sparkContext.parallelize(Seq(Person("A",10),Person("B",20))) 
      
          // the sparksession has a method to convert to an Dataset 
          val ds = spark.createDataset(personRDD) 
          println(ds.count()) 
          } 
      } 
      

      私は、次の変更を加えましたなぜ私がしなければならなかったか これ)

    • 使用createDatasetは、変換

    のためにしかし、私はそれがこの変換を行うにはかなり珍しいですね、あなたはおそらく、直接read方法

    を使用してDatasetにあなたの入力を読みたいです
  • 関連する問題