2017-09-01 12 views
0

をLIBSVMするデータフレームにゼロ膨張したデータを変換する私はスカラ座に非常に新しいです(一般的に、私はRでこれを行う)スカラ/スパーク:

私はある大きなデータフレーム(2000+列、100000+行)をインポートしていますゼロ膨張した。

  1. を次のように私は手順を理解したようなフォーマット

    ステップ をLIBSVMするためにデータを変換するには

    タスク があるDoubleTypeに設定し、ターゲットがint

  2. であるとしている機能の列を確認してください
  3. 各行を繰り返し、1つの配列に0以上の値を保持し、別の配列にその列のインデックスを付けます。
  4. 私はステップ2間違ってやっているので、私は3(多分)に貼り付けていますLIBSVM形式でRDD [LabeledPoint]
  5. 保存RDDへ

に変換します。ここで

が私のコードです:

主な機能:

@Test 
def testSpark(): Unit = 
{ 
try 
{ 

    var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv") 


    val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType) 

    val indexer = new StringIndexer() 
    .setInputCol("Majors_Final") 
    .setOutputCol("Majors_Final_Indexed") 
    val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped) 
    val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType) 



    //only doubles accepted by sparse vector, so that's what we filter for 
    val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType) 

    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name) 


    val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF() 


    assertTrue(true) 
} 
catch 
{ 
    case ex: Exception => 
    { 

    println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}") 
    fail() 
    } 
    } 
} 

LabeledPointに各行を変換します

@throws(classOf[Exception]) 
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint = 
{ 
    try 
    { 
    val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq) 

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*) 
    val rowValuesItr: Iterable[Double] = sortedValuesMap.values 

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]() 
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]() 
    var currentPosition: Int = 0 
    rowValuesItr.foreach 
    { 
     kv => 
     if (kv > 0) 
     { 
      valuesArray += kv; 
      positionsArray += currentPosition; 
     } 
     currentPosition = currentPosition + 1; 
    } 

    val lp:LabeledPoint = new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray)) 

    return lp 

    } 
    catch 
    { 
    case ex: Exception => 
    { 
     throw new Exception(ex) 
    } 
    } 
} 

問題 それでは、私が作成しようlaのデータフレームRDDに簡単に変換することができます。

val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF() 

しかし、私は次のエラーを取得:

SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()

+1

あなたが言及したエラーメッセージとして '輸入spark.implicits._'を試してみましたか?また、通常は 'return'はスカラでは使用されません。問題が生じる可能性があります。 – Shaido

+0

タスクはシリアライズ可能ではなく、org.apache.spark.SparkException:タスクは暗黙のうちにシリアライズできません – Jake

+0

私は行列を見る必要があると思います – Jake

答えて

0

OKを、私はデータフレームをスキップしてLabeledPointsのシューッという音の配列を簡単にRDDに変換されて作成されました。残りは簡単です。

私はこれが機能しているが、私はスカラーが新しく、これを行うためのより効率的な方法があると強調しています。次のように

主な機能は以下のようになります。

val mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv") 
    val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType) 

    val indexer = new StringIndexer() 
    .setInputCol("Majors_Final") 
    .setOutputCol("Majors_Final_Indexed") 
    val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped) 
    val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType) 

    mDFFinal.show() 
    //only doubles accepted by sparse vector, so that's what we filter for 
    val fieldSeq: scala.collection.Seq[StructField] = mDFFinal.schema.fields.toSeq.filter(f => f.dataType == DoubleType) 
    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name) 

    var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]() 

    mDFFinal.collect().foreach 
    { 

    row => positionsArray+=convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed")); 

    } 

    val mRdd:RDD[LabeledPoint]= spark.sparkContext.parallelize(positionsArray.toSeq) 

    MLUtils.saveAsLibSVMFile(mRdd, "./output/libsvm")