2016-04-14 11 views
3

から一致列は、私は別のデータフレームが最初のデータフレーム内の各列の係数を含むスパーク2つのデータフレーム

+---+---+------+---+ 
| sp|sp2|colour|sp3| 
+---+---+------+---+ 
| 0| 1|  1| 0| 
| 1| 0|  0| 1| 
| 0| 0|  1| 0| 
+---+---+------+---+ 

以下のようなフォーマットのデータフレームを有します。たとえば

+------+------+---------+------+ 
| CE_sp|CE_sp2|CE_colour|CE_sp3| 
+------+------+---------+------+ 
| 0.94| 0.31|  0.11| 0.72| 
+------+------+---------+------+ 

ここで、2番目のデータフレームからスコアを加算して計算される最初のデータフレームに列を追加します。

ex。

+---+---+------+---+-----+ 
| sp|sp2|colour|sp3|Score| 
+---+---+------+---+-----+ 
| 0| 1|  1| 0| 0.42| 
| 1| 0|  0| 1| 1.66| 
| 0| 0|  1| 0| 0.11| 
+---+---+------+---+-----+ 

すなわち

r -> row of first dataframe 
score = r(0)*CE_sp + r(1)*CE_sp2 + r(2)*CE_colour + r(3)*CE_sp3 

列と異なっていてもよい列の順序のn個存在し得ます。

ありがとうございました!!!

+1

ので、あなたの第二のデータフレームは、4つの値で、実際に1行が含まれていますか? – eliasah

+0

@eliasahの値は増加する可能性がありますが、2番目のデータフレームでは行の数は常に1に留まります。 – nareshbabral

+1

実際には2番目のDataFrameは必要ありません – eliasah

答えて

4

迅速かつ簡単:

import org.apache.spark.sql.functions.col 

val df = Seq(
    (0, 1, 1, 0), (1, 0, 0, 1), (0, 0, 1, 0) 
).toDF("sp","sp2", "colour", "sp3") 

val coefs = Map("sp" -> 0.94, "sp2" -> 0.32, "colour" -> 0.11, "sp3" -> 0.72) 
val score = df.columns.map(
    c => col(c) * coefs.getOrElse(c, 0.0)).reduce(_ + _) 

df.withColumn("score", score) 

そしてPySparkで同じこと:ここで

from pyspark.sql.functions import col 

df = sc.parallelize([ 
    (0, 1, 1, 0), (1, 0, 0, 1), (0, 0, 1, 0) 
]).toDF(["sp","sp2", "colour", "sp3"]) 

coefs = {"sp": 0.94, "sp2": 0.32, "colour": 0.11, "sp3": 0.72} 
df.withColumn("score", sum(col(c) * coefs.get(c, 0) for c in df.columns)) 
+0

それは目的を果たす – nareshbabral

1

私はあなたがやろうとしていることを達成するための多くの方法があると信じています。すべてのケースで、私がコメントで述べたように、その2番目のDataFrameは必要ありません。私はこれが役に立てば幸い

import org.apache.spark.ml.feature.{ElementwiseProduct, VectorAssembler} 
import org.apache.spark.mllib.linalg.{Vectors,Vector => MLVector} 

val df = Seq((0, 1, 1, 0), (1, 0, 0, 1), (0, 0, 1, 0)).toDF("sp", "sp2", "colour", "sp3") 

// Your coefficient represents a dense Vector 
val coeffSp = 0.94 
val coeffSp2 = 0.31 
val coeffColour = 0.11 
val coeffSp3 = 0.72 

val weightVectors = Vectors.dense(Array(coeffSp, coeffSp2, coeffColour, coeffSp3)) 

// You can assemble the features with VectorAssembler 
val assembler = new VectorAssembler() 
    .setInputCols(df.columns) // since you need to compute on all your columns 
    .setOutputCol("features") 

// Once these features assembled we can perform an element wise product with the weight vector 
val output = assembler.transform(df) 
val transformer = new ElementwiseProduct() 
    .setScalingVec(weightVectors) 
    .setInputCol("features") 
    .setOutputCol("weightedFeatures") 

// Create an UDF to sum the weighted vectors values 
import org.apache.spark.sql.functions.udf 
def score = udf((score: MLVector) => { score.toDense.toArray.sum }) 

// Apply the UDF on the weightedFeatures 
val scores = transformer.transform(output).withColumn("score",score('weightedFeatures)) 
scores.show 
// +---+---+------+---+-----------------+-------------------+-----+ 
// | sp|sp2|colour|sp3|   features| weightedFeatures|score| 
// +---+---+------+---+-----------------+-------------------+-----+ 
// | 0| 1|  1| 0|[0.0,1.0,1.0,0.0]|[0.0,0.31,0.11,0.0]| 0.42| 
// | 1| 0|  0| 1|[1.0,0.0,0.0,1.0]|[0.94,0.0,0.0,0.72]| 1.66| 
// | 0| 0|  1| 0| (4,[2],[1.0])|  (4,[2],[0.11])| 0.11| 
// +---+---+------+---+-----------------+-------------------+-----+ 

は、ここに1つの方法です。より多くの質問があれば、躊躇しないでください。

+0

こんにちは。列 ".setInputCols(df.columns)//では、すべての列で計算する必要があるため"ではなく、一部の列のみを選択することは可能ですか? – user1384205

+0

私はあなたの質問を得るのか分からない。 – eliasah

+0

私は文字列とint列のdfを持っています。すべてのint列の重み付けされたフィーチャの列を追加したいと思います。上記の場合、すべての列(sp、sp2、color、sp3)は重みを計算するために使用されます。私の場合、私は重みの列を計算するためにわずかな列を選択したいと思います。これは可能ですか?私はこのvalフィールドのようなものを試しました:Array [String] = Array( "sAtt1"、 "sAtt2"、 "sAtt3"、 "sAtt4"、 "sAtt5"、 "sAtt6"、 "sAtt7"、 "sAtt8"、 "sAtt9" ( "features") – user1384205

1

はシンプルなソリューションです:

scala> df_wght.show 
+-----+------+---------+------+ 
|ce_sp|ce_sp2|ce_colour|ce_sp3| 
+-----+------+---------+------+ 
| 1|  2|  3|  4| 
+-----+------+---------+------+ 

scala> df.show 
+---+---+------+---+ 
| sp|sp2|colour|sp3| 
+---+---+------+---+ 
| 0| 1|  1| 0| 
| 1| 0|  0| 1| 
| 0| 0|  1| 0| 
+---+---+------+---+ 

その後、我々は単純なクロスを行うことができますジョインとクロスプロダクト

val scored = df.join(df_wght).selectExpr("(sp*ce_sp + sp2*ce_sp2 + colour*ce_colour + sp3*ce_sp3) as final_score") 

出力:

scala> scored.show 
+-----------+                 
|final_score| 
+-----------+ 
|   5| 
|   5| 
|   3| 
+-----------+ 
関連する問題