をLIBSVMするデータフレームにゼロ膨張したデータを変換する私はスカラ座に非常に新しいです(一般的に、私はRでこれを行う)スカラ/スパーク:
私はある大きなデータフレーム(2000+列、100000+行)をインポートしていますゼロ膨張した。
- を次のように私は手順を理解したようなフォーマット
ステップ をLIBSVMするためにデータを変換するには
タスク があるDoubleTypeに設定し、ターゲットがint
であるとしている機能の列を確認してください
- 各行を繰り返し、1つの配列に0以上の値を保持し、別の配列にその列のインデックスを付けます。
- 私はステップ2間違ってやっているので、私は3(多分)に貼り付けていますLIBSVM形式でRDD [LabeledPoint]
- 保存RDDへ
に変換します。ここで
が私のコードです:
主な機能:
@Test
def testSpark(): Unit =
{
try
{
var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)
val indexer = new StringIndexer()
.setInputCol("Majors_Final")
.setOutputCol("Majors_Final_Indexed")
val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)
//only doubles accepted by sparse vector, so that's what we filter for
val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType)
val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
assertTrue(true)
}
catch
{
case ex: Exception =>
{
println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}")
fail()
}
}
}
LabeledPointに各行を変換します
@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint =
{
try
{
val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq)
val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
val rowValuesItr: Iterable[Double] = sortedValuesMap.values
var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
var currentPosition: Int = 0
rowValuesItr.foreach
{
kv =>
if (kv > 0)
{
valuesArray += kv;
positionsArray += currentPosition;
}
currentPosition = currentPosition + 1;
}
val lp:LabeledPoint = new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray))
return lp
}
catch
{
case ex: Exception =>
{
throw new Exception(ex)
}
}
}
問題 それでは、私が作成しようlaのデータフレームRDDに簡単に変換することができます。
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
しかし、私は次のエラーを取得:
SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
あなたが言及したエラーメッセージとして '輸入spark.implicits._'を試してみましたか?また、通常は 'return'はスカラでは使用されません。問題が生じる可能性があります。 – Shaido
タスクはシリアライズ可能ではなく、org.apache.spark.SparkException:タスクは暗黙のうちにシリアライズできません – Jake
私は行列を見る必要があると思います – Jake