2016-12-23 15 views
1

私は線形モデルのいくつかの機能を一般化することを目的としたScalaクラスを持っています - 具体的には、係数の配列と配列予測子を使用し、クラスはDataFrameからデータを取得し、単純な線形モデルを使用して、以下に示すようにDataFrame全体で予測を作成します。単一値の代わりに配列を返すscala/sparkマップ

私は予測値の列を生成すると期待している最後の行についています。私はいくつかのアプローチを試してきました(どれもがコメントアウトされています)。それが今、文句を言わない/型の不一致のC bは、コンパイルされたように、コード:

[error] found : Array[org.apache.spark.sql.Column] 
[error] required: org.apache.spark.sql.Column 
[error]  .withColumn("prediction", colMod(preds.map(p => data(p)))) 
[error]            ^

...私もpredを<で取得 - バージョンをpreds ...とforeachのバージョン:

[error] found : Unit 
[error] required: org.apache.spark.sql.Column 
[error]  .withColumn("prediction", colMod(preds.foreach(data(_)))) 
[error]             ^

問題を解決しようと無駄に努力していました...提案に感謝します。ここで

class LinearModel(coefficients: Array[Double], 
        predictors: Array[String], 
        data: DataFrame) { 

    val coefs = coefficients 
    val preds = Array.concat(Array("bias"), predictors) 
    require(coefs.length == preds.length) 

    /** 
     * predict: computes linear model predictions as the dot product of the coefficents and the 
     * values (X[i] in the model matrix) 
     * @param values: the values from a single row of the given variables from model matrix X 
     * @param coefs: array of coefficients to be applied to each of the variables in values 
     *    (the first coef is assumed to be 1 for the bias/intercept term) 
     * @return: the predicted value 
     */ 
    private def predict(values: Array[Double], coefs: Array[Double]): Unit = { 
     (for ((c, v) <- coefs.zip(values)) yield c * v).sum 
    } 

    /** 
     * colMod (udf): passes the values for each relevant value to predict() 
     * @param values: an Array of the numerical values of each of the specified predictors for a 
     *    given record 
     */ 
    private val colMod = udf((values: Array[Double]) => predict(values, coefs)) 

    val dfPred = data 
     // create the column with the prediction 
     .withColumn("prediction", colMod(preds.map(p => data(p)))) 
     //.withColumn("prediction", colMod(for (pred <- preds) yield data(pred))) 
     //.withColumn("prediction", colMod(preds.foreach(data(_)))) 
     // prev line should = colMod(data(pred1), data(pred2), ..., data(predn)) 
    } 

答えて

1

は、それが適切に行わできること方法です。

import org.apache.spark.sql.functions.{lit, col} 
import org.apache.spark.sql.Column 

def predict(coefficients: Seq[Double], predictors: Seq[String], df: DataFrame) = { 

    // I assume there is no predictor for bias 
    // but you can easily correct for that 
    val prediction: Column = predictors.zip(coefficients).map { 
    case (p, c) => col(p) * lit(c) 
    }.foldLeft(col("bias"))(_ + _) 

    df.withColumn("prediction", prediction) 
} 

使用例:結果ビーイングと

val df = Seq((1.0, -1.0, 3.0, 5.0)).toDF("bias", "x1", "x2", "x3") 

predict(Seq(2.0, 3.0), Seq("x1", "x3"), df) 

+----+----+---+---+----------+ 
|bias| x1| x2| x3|prediction| 
+----+----+---+---+----------+ 
| 1.0|-1.0|3.0|5.0|  14.0| 
+----+----+---+---+----------+ 

あなたのコードについて、あなたはしました多くの間違いをした:

  • Array[_]ArrayType列の有効な外部型ではありません。有効な外部表現はSeq[_]なので、udfに渡す関数の引数はSeq[Double]である必要があります。
  • udfに渡される関数は、Unitにできません。あなたの場合、それはDoubleでなければなりません。前のポイントと組み合わせると、有効な署名は(Seq[Double], Seq[Double]) => Doubleになります。
  • colModは、タイプColumnの1つの引数が必要です。

    import org.apache.spark.sql.functions.array 
    
    colMod(array(preds.map(col): _*)) 
    
  • あなたのコードはNULL/null安全ではありません。

関連する問題