2017-03-08 13 views
5

spudの構造体をudfに渡そうとしています。フィールド名を変更し、列の位置に名前を変更しています。どうすれば修正できますか?Spark Struct構造体の名前がUDFで変更される

object TestCSV { 

      def main(args: Array[String]) { 

      val conf = new SparkConf().setAppName("localTest").setMaster("local") 
      val sc = new SparkContext(conf) 
      val sqlContext = new SQLContext(sc) 


      val inputData = sqlContext.read.format("com.databricks.spark.csv") 
        .option("delimiter","|") 
        .option("header", "true") 
        .load("test.csv") 


      inputData.printSchema() 

      inputData.show() 

      val groupedData = inputData.withColumn("name",struct(inputData("firstname"),inputData("lastname"))) 

      val udfApply = groupedData.withColumn("newName",processName(groupedData("name"))) 

      udfApply.show() 
      } 



      def processName = udf((input:Row) =>{ 

       println(input) 
       println(input.schema) 

       Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname")) 

       }) 

     } 

出力:

root 
|-- id: string (nullable = true) 
|-- firstname: string (nullable = true) 
|-- lastname: string (nullable = true) 

+---+---------+--------+ 
| id|firstname|lastname| 
+---+---------+--------+ 
| 1|  jack| reacher| 
| 2|  john|  Doe| 
+---+---------+--------+ 

エラー:

[jack,reacher] StructType(StructField(i[1],StringType,true), > StructField(i[2],StringType,true)) 17/03/08 09:45:35 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.IllegalArgumentException: Field "firstname" does not exist.

+0

あなただけのUDFに直接(Strings' 'など)2つの文字列を渡すことはありませんなぜですか? –

+0

Spark UDFでは、引数として10個以上のフィールドを渡すことはできません。私がここで提供したのは、単純化されたユースケースです。いつか私はUDFで20列以上を渡す必要があります。どのように達成するのですか? – hp2326

答えて

1

あなたが遭遇している何が本当に不思議です。ちょっと遊んだ後、私は最終的に、それがオプティマイザエンジンの問題に関連しているかもしれないと考えました。問題はUDFではなく、structの機能のようです。

私はgroupedDatacacheとき、私は、キャッシングなしで私はあなたの報告例外を取得し、それは(スパーク1.6.3)の仕事を得る:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.{SparkConf, SparkContext} 


object Demo { 

    def main(args: Array[String]): Unit = { 

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[1]")) 
    val sqlContext = new HiveContext(sc) 
    import sqlContext.implicits._ 
    import org.apache.spark.sql.functions._ 


    def processName = udf((input: Row) => { 
     Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname")) 
    }) 


    val inputData = 
     sc.parallelize(
     Seq(("1", "Kevin", "Costner")) 
    ).toDF("id", "firstname", "lastname") 


    val groupedData = inputData.withColumn("name", struct(inputData("firstname"), inputData("lastname"))) 
     .cache() // does not work without cache 

    val udfApply = groupedData.withColumn("newName", processName(groupedData("name"))) 
    udfApply.show() 
    } 
} 

また、あなたの構造体を作るためにRDDのAPIを使用しますが、このことができます本当に素敵ではありません。

case class Name(firstname:String,lastname:String) // define outside main 

val groupedData = inputData.rdd 
    .map{r => 
     (r.getAs[String]("id"), 
      Name(
      r.getAs[String]("firstname"), 
      r.getAs[String]("lastname") 
     ) 
     ) 
    } 
    .toDF("id","name") 
+0

ありがとう@ラファエルロス。これは今私のために働いた。私はこの答えを受け入れるだろう。 – hp2326

関連する問題