2017-01-02 12 views
1

Sparkはデータを並列に処理しますが、操作は処理しません。私のDAGでは、カラムごとの関数を呼びたい Spark processing columns in parallel各カラムの値は他のカラムとは独立に計算できます。 spark-SQL APIを使ってこのような並列性を達成する方法はありますか?ウィンドウ関数を利用するSpark dynamic DAG is a lot slower and different from hard coded DAGは、DAGを多く最適化するのに役立ちましたが、連続した方法でのみ実行されます。sparkを並列カラムに適用する

情報は以下のhttps://github.com/geoHeil/sparkContrastCoding

最小例見つけることができますもう少し含まれている例:

val df = Seq(
    (0, "A", "B", "C", "D"), 
    (1, "A", "B", "C", "D"), 
    (0, "d", "a", "jkl", "d"), 
    (0, "d", "g", "C", "D"), 
    (1, "A", "d", "t", "k"), 
    (1, "d", "c", "C", "D"), 
    (1, "c", "B", "C", "D") 
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 

val inputToDrop = Seq("col3TooMany") 
val inputToBias = Seq("col1", "col2") 

val targetCounts = df.filter(df("TARGET") === 1).groupBy("TARGET").agg(count("TARGET").as("cnt_foo_eq_1")) 
val newDF = df.toDF.join(broadcast(targetCounts), Seq("TARGET"), "left") 
    newDF.cache 
def handleBias(df: DataFrame, colName: String, target: String = target) = { 
    val w1 = Window.partitionBy(colName) 
    val w2 = Window.partitionBy(colName, target) 

    df.withColumn("cnt_group", count("*").over(w2)) 
     .withColumn("pre2_" + colName, mean(target).over(w1)) 
     .withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))) 
     .drop("cnt_group") 
    } 

val joinUDF = udf((newColumn: String, newValue: String, codingVariant: Int, results: Map[String, Map[String, Seq[Double]]]) => { 
    results.get(newColumn) match { 
     case Some(tt) => { 
     val nestedArray = tt.getOrElse(newValue, Seq(0.0)) 
     if (codingVariant == 0) { 
      nestedArray.head 
     } else { 
      nestedArray.last 
     } 
     } 
     case None => throw new Exception("Column not contained in initial data frame") 
    } 
    }) 

を今、私はすべての列に自分のhandleBias機能を適用したい、残念ながら、これは並行して実行されません。それは本当にあなたのケースを助けていない事実ですが、各列の

val res = (inputToDrop ++ inputToBias).toSet.foldLeft(newDF) { 
    (currentDF, colName) => 
     { 
     logger.info("using col " + colName) 
     handleBias(currentDF, colName) 
     } 
    } 
    .drop("cnt_foo_eq_1") 

val combined = ((inputToDrop ++ inputToBias).toSet).foldLeft(res) { 
    (currentDF, colName) => 
     { 
     currentDF 
      .withColumn("combined_" + colName, map(col(colName), array(col("pre_" + colName), col("pre2_" + colName)))) 
     } 
    } 

val columnsToUse = combined 
    .select(combined.columns 
     .filter(_.startsWith("combined_")) 
     map (combined(_)): _*) 

val newNames = columnsToUse.columns.map(_.split("combined_").last) 
val renamed = columnsToUse.toDF(newNames: _*) 

val cols = renamed.columns 
val localData = renamed.collect 

val columnsMap = cols.map { colName => 
    colName -> localData.flatMap(_.getAs[Map[String, Seq[Double]]](colName)).toMap 
}.toMap 

答えて

2

値が他の列

から独立して計算することができます。多くの独立した番号DataFramesを生成することができます。それぞれ独自の追加がありますが、これを自動的に1つの実行計画に組み合わせることはできません。

handleBiasの各アプリケーションは、データを2回シャッフルし、出力DataFramesは、DataFrameと同じデータ配信を持ちません。このため、列のリストの上にfoldがある場合、各追加は別々に実行する必要があります。

理論的にあなたはこのように(擬似コードで)表現することができるパイプラインを設計できます。

  • は、一意のIDを追加します。

    df_with_id = df.withColumn("id", unique_id()) 
    
  • 、それぞれが独立してDFと変換の計算〜ワイドフォーマット:

    dfs = for (c in columns) 
        yield handle_bias(df, c).withColumn(
        "pres", explode([(pre_name, pre_value), (pre2_name, pre2_value)]) 
    ) 
    
  • 組合のすべての部分的な結果:長いから幅広いフォーマットに変換する

    combined = dfs.reduce(union) 
    
  • ピボット:

    combined.groupBy("id").pivot("pres._1").agg(first("pres._2")) 
    

が、私はそれがすべての大騒ぎの価値がある疑い。使用するプロセスは非常に重く、重要なネットワークとディスクIOが必要です。

総レベル(sum count(distinct x)) for x in columns))の数が比較的少ない場合は、それ以外の場合はローカルで統計を計算することができますレベルにダウンサンプリングを検討例Map[Tuple2[_, _], StatCounter]aggregateByKeyのために使用して、単一パスを持つすべての統計を計算しようとすることができます。

関連する問題