2016-09-14 5 views
2

以下のスパークコードは、私がしたいことを正しく示し、小さなデモデータセットで正しい出力を生成します。groupBy/aggregateのスパークマージ/結合

大量の本番データで同じタイプのコードを実行すると、実行時に問題が発生します。 Sparkジョブは約12時間、クラスタ上で実行され、失敗します。

次のコードを見るだけで、すべての行を爆発させて元に戻すのは効率的ではないようです。与えられたテストデータセットでは、array_value_1に3つの値があり、array_value_2に3つの値があり、3 * 3または9つの分解された行に分解する4番目の行。

大きなデータセットでは、このような配列列が5行あり、各列に10個の値が10^5展開された行に展開されますか?

提供されたSpark機能を見ると、私が望むことをする機能はありません。私はユーザー定義関数を提供することができます。それにはスピードの欠点はありますか?

val sparkSession = SparkSession.builder. 
    master("local") 
    .appName("merge list test") 
    .getOrCreate() 

val schema = StructType(
    StructField("category", IntegerType) :: 
    StructField("array_value_1", ArrayType(StringType)) :: 
    StructField("array_value_2", ArrayType(StringType)) :: 
    Nil) 

val rows = List(
    Row(1, List("a", "b"), List("u", "v")), 
    Row(1, List("b", "c"), List("v", "w")), 
    Row(2, List("c", "d"), List("w")), 
    Row(2, List("c", "d", "e"), List("x", "y", "z")) 
) 

val df = sparkSession.createDataFrame(rows.asJava, schema) 

val dfExploded = df. 
    withColumn("scalar_1", explode(col("array_value_1"))). 
    withColumn("scalar_2", explode(col("array_value_2"))) 

// This will output 19. 2*2 + 2*2 + 2*1 + 3*3 = 19 
logger.info(s"dfExploded.count()=${dfExploded.count()}") 

val dfOutput = dfExploded.groupBy("category").agg(
    collect_set("scalar_1").alias("combined_values_2"), 
    collect_set("scalar_2").alias("combined_values_2")) 

dfOutput.show() 

答えて

6

それはexplodeに非効率的かもしれないが、基本的に、あなたが実装しようとする操作が簡単に高価です。効果的にはそれはちょうど別のgroupByKeyであり、あなたがそれをより良くするためにここでできることはあまりありません。あなたが直接collect_listと平らにできスパーク> 2.0を使用しているので:

val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten.distinct) 

df 
    .groupBy("category") 
    .agg(
    flatten(collect_list("array_value_1")), 
    flatten(collect_list("array_value_2")) 
) 

custom Aggregatorを使用することも可能であるが、私はこれらのいずれかが、大きな違いを生むだろう疑います。

セットが比較的大きく、あなたは重複のかなりの数を期待する場合は、変更可能なセットでaggregateByKeyを使用するように試みることができる:

import scala.collection.mutable.{Set => MSet} 

val rdd = df 
    .select($"category", struct($"array_value_1", $"array_value_2")) 
    .as[(Int, (Seq[String], Seq[String]))] 
    .rdd 

val agg = rdd 
    .aggregateByKey((MSet[String](), MSet[String]()))( 
    {case ((accX, accY), (xs, ys)) => (accX ++= xs, accY ++ ys)}, 
    {case ((accX1, accY1), (accX2, accY2)) => (accX1 ++= accX2, accY1 ++ accY2)} 
) 
    .mapValues { case (xs, ys) => (xs.toArray, ys.toArray) } 
    .toDF 
+1

全く問題が修正され、単純な平坦化のUDFの最初のソリューション。スパークは、失敗する前に〜12時間かかってから30分で正常に仕事を完了するまで行った。 SparkモニタのGUIを見て、各内部タスクは1分以内に実行され、完了します。これに助けてくれてありがとう。 – clay

+0

私は驚いていることを認めなければならないが、それを聞いてうれしい。私は小さな改善を期待しましたが、それほど印象的なものはありません。個々のリストの大きさはどれくらいですか? – zero323