2016-12-29 8 views
2

私はScalaを初めて使い、データフレームをrddに変換したいと思っています。 ラベルに、の機能は、MLlibの入力用にRDD[labelPoint]に変換されます。しかし、私はWrappedArrayに対処する方法を見つけることができません。スカラーのデータフレーム(WrappedArray付き)をRDDに変換する[labelPoint]

scala> test.printSchema 
root 
|-- user_id: long (nullable = true) 
|-- brand_store_sn: string (nullable = true) 
|-- label: integer (nullable = true) 
|-- money_score: double (nullable = true) 
|-- normal_score: double (nullable = true) 
|-- action_score: double (nullable = true) 
|-- features: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- flag: string (nullable = true) 
|-- dt: string (nullable = true) 


scala> test.head 
res21: org.apache.spark.sql.Row = [2533,10005072,1,2.0,1.0,1.0,WrappedArray(["d90_pv_1sec:1.4471580313422192", "d3_pv_1sec:0.9030899869919435", "d7_pv_1sec:0.9030899869919435", "d30_pv_1sec:1.414973347970818", "d90_pv_week_decay:1.4235871662780681", "d1_pv_1sec:0.9030899869919435", "d120_pv_1sec:1.4471580313422192"]),user_positive,20161130] 

答えて

1

まず - LabeledPointDouble秒のベクトルを期待するので、私はあなたにもコロン(:)によって、すべてのfeatures配列の各要素を分割し、としてそれの右側を扱いたいと仮定していますダブル、例えば: -

"d90_pv_1sec:1.4471580313422192" --> 1.4471580313422192 

もしそうなら、ここでの変換です:

import org.apache.spark.mllib.linalg.{Vector, Vectors} 
import org.apache.spark.mllib.regression.LabeledPoint 

// sample data - DataFrame with label, features and other columns 
val df = Seq(
    (1, Array("d90_pv_1sec:1.4471580313422192", "d3_pv_1sec:0.9030899869919435"), 4.0), 
    (2, Array("d7_pv_1sec:0.9030899869919435", "d30_pv_1sec:1.414973347970818"), 5.0) 
).toDF("label", "features", "ignored") 

// extract relevant fields from Row and convert WrappedArray[String] into Vector: 
val result = df.rdd.map(r => { 
    val label = r.getAs[Int]("label") 
    val featuresArray = r.getAs[mutable.WrappedArray[String]]("features") 
    val features: Vector = Vectors.dense(
    featuresArray.map(_.split(":")(1).toDouble).toArray 
) 
    LabeledPoint(label, features) 
}) 

result.foreach(println) 
// (1.0,[1.4471580313422192,0.9030899869919435]) 
// (2.0,[0.9030899869919435,1.414973347970818]) 

EDITは:

"d90_pv_1sec:1.4471580313422192" --> index = 90; value = 1.4471580313422192 

修正されたコードは次のようになります:清澄あたり、今入力アレイ内の各項目を想定し、得られたベクターで予想インデックスが含ま

val vectorSize: Int = 100 // just a guess - should be the maximum index + 1 

val result = df.rdd.map(r => { 
    val label = r.getAs[Int]("label") 
    val arr = r.getAs[mutable.WrappedArray[String]]("features").toArray 
    // parse each item into (index, value) tuple to use in sparse vector 
    val elements = arr.map(_.split(":")).map { 
    case Array(s, d) => (s.replaceAll("d|_pv_1sec","").toInt, d.toDouble) 
    } 
    LabeledPoint(label, Vectors.sparse(vectorSize, elements)) 
}) 

result.foreach(println) 
// (1.0,(100,[3,90],[0.9030899869919435,1.4471580313422192])) 
// (2.0,(100,[7,30],[0.9030899869919435,1.414973347970818])) 

s.replaceAll("d|_pv_1sec","")を使用すると、各項目の正規表現を個別にコンパイルするので、少し遅いかもしれません。その場合、正規表現を使用しないより速い(まだ醜い)s.replace("d", "").replace("_pv_1sec", "")に置き換えることができます。

+0

答えに感謝します!できます! –

+0

er ...ラベルと疎な特徴ベクトルのようなフォーマットが必要です:LabeledPoint(ラベル、Ve​​ctors.sparse(3)、array( "d90_pv_1sec"、 "d3_pv_1sec")、Array(1.4471580313422192、0.9030899869919435)) –

+0

あなたは、 t - 'LabeledPoint.features'は' org.apache.spark.mllib.linalg.Vector'とタイプします。これは必然的に 'Doubles'のベクトルであり、Strings/Arraysではなく - https://github.com/apache/sparkを参照してください。 /blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala#L41 –

関連する問題