2017-03-23 6 views
1

私の目的は、csvファイルからデータを読み込み、自分のrddをscala/sparkのデータフレームに変換することです。これは私のコードです:Scala:Serializableを使用した製品でパラメータが使用されない

このスタッドで
package xxx.DataScience.CompensationStudy 

import org.apache.spark._ 
import org.apache.log4j._ 

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{col, udf} 
import org.apache.spark.sql.types._ 

import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 


object CompensationAnalysis { 

    case class GetDF(profil_date:String, profil_pays:String, param_tarif2:String, param_tarif3:String, dt_titre:String, dt_langues:String, 
    dt_diplomes:String, dt_experience:String, dt_formation:String, dt_outils:String, comp_applications:String, 
    comp_interventions:String, comp_competence:String) 

    def main(args: Array[String]) { 

    Logger.getLogger("org").setLevel(Level.ERROR) 

    val conf = new SparkConf().setAppName("CompensationAnalysis ") 
    val sc = new SparkContext(conf) 

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 


    val lines = sc.textFile("C:/Users/../Downloads/CompensationStudy.csv").flatMap { l => 


     l.split(",") match { 

     case field: Array[String] if field.size > 13 => Some(field(0), field(1), field(2), field(3), field(4), field(5), field(6), field(7), field(8), field(9), field(10), field(11), field(12)) 

     case field: Array[String] if field.size == 1 => Some((field(0), "default value")) 

     case _ => None 
     } 


    } 

、私はエラーが発生しました:Serializableを持つ製品は、パラメータ

val summary = lines.collect().map(x => GetDF(x("profil_date"), x("profil_pays"), x("param_tarif2"), x("param_tarif3"), x("dt_titre"), x("dt_langues"), x("dt_diplomes"), x("dt_experience"), x("dt_formation"), x("dt_outils"), x("comp_applications"), x("comp_interventions"), x("comp_competence"))) 

    val sum_df = summary.toDF() 

    df.printSchema 


    } 

} 

を取ることはありませんこれはスクリーンショットです:

enter image description here

お願いしますか?

+1

問題は、 'lines'を定義する' flatMap'にあります。コンパイラが推論できる唯一の型は 'RDD [Product with Serializable]'です。なぜならあなたは 'Option'sに異なる型を持っているからです。 –

答えて

2

改善すべき点がいくつかあります。例外を引き起こす最も緊急な問題は、@ CyrilleCorpetが指摘しているように、「Some[Tuple13],Some[Tuple2]およびNone.typeの戻り値をパターンマッチングする3つの異なる行。Option[Product with Serializable]flatMapに準拠しています。 (結果はIterable[T]でなければなりません)。

あなたがSome[Tuple13]Some[Tuple13]、およびNoneまたはSome[Tuple2]Some[Tuple2]、およびNoneを持っていた場合は、基本的に、あなたはしたほうが良いでしょう。

また、タイプのパターンマッチングはタイプイレーズのため一般的には好ましくありません。また、パターンマッチングはあなたの状況にはまったく適していません。

つまり、あなたのケースクラスにデフォルト値を設定できます。

case class GetDF(profile_date: String, 
       profile_pays: String = "default", 
       param_tarif2: String = "default", 
       ... 
) 

を次に、あなたのラムダに:

val tokens = l.split 
if (l.length > 13) { 
    Some(GetDf(l(0), l(1), l(2)...)) 
} else if (l.length == 1) { 
    Some(GetDf(l(0))) 
} else { 
    None 
} 

今、あなたはOption[GetDF]を返却しているすべてのケースで。すべてNoneを取り除き、GetDFのインスタンスだけを保持するには、flatMapRDDを使用できます。

+0

実際に、パターンマッチングの3つの異なる行は、 'Some [Tuple13]'、 'Some [Tuple2]'および 'None.type'型の値を返します。最小の上限は 'flatmap'のシグネチャ(結果は' Iterable [T] 'でなければなりません)に従う' Option [Product with Serializable] 'です。とにかくソリューションの妥当性は変わりません。 –

+0

良いキャッチ。私はそれに応じて更新しました。ありがとう。 – Vidya

関連する問題