以下のスパークコードは、私がしたいことを正しく示し、小さなデモデータセットで正しい出力を生成します。groupBy/aggregateのスパークマージ/結合
大量の本番データで同じタイプのコードを実行すると、実行時に問題が発生します。 Sparkジョブは約12時間、クラスタ上で実行され、失敗します。
次のコードを見るだけで、すべての行を爆発させて元に戻すのは効率的ではないようです。与えられたテストデータセットでは、array_value_1に3つの値があり、array_value_2に3つの値があり、3 * 3または9つの分解された行に分解する4番目の行。
大きなデータセットでは、このような配列列が5行あり、各列に10個の値が10^5展開された行に展開されますか?
提供されたSpark機能を見ると、私が望むことをする機能はありません。私はユーザー定義関数を提供することができます。それにはスピードの欠点はありますか?
val sparkSession = SparkSession.builder.
master("local")
.appName("merge list test")
.getOrCreate()
val schema = StructType(
StructField("category", IntegerType) ::
StructField("array_value_1", ArrayType(StringType)) ::
StructField("array_value_2", ArrayType(StringType)) ::
Nil)
val rows = List(
Row(1, List("a", "b"), List("u", "v")),
Row(1, List("b", "c"), List("v", "w")),
Row(2, List("c", "d"), List("w")),
Row(2, List("c", "d", "e"), List("x", "y", "z"))
)
val df = sparkSession.createDataFrame(rows.asJava, schema)
val dfExploded = df.
withColumn("scalar_1", explode(col("array_value_1"))).
withColumn("scalar_2", explode(col("array_value_2")))
// This will output 19. 2*2 + 2*2 + 2*1 + 3*3 = 19
logger.info(s"dfExploded.count()=${dfExploded.count()}")
val dfOutput = dfExploded.groupBy("category").agg(
collect_set("scalar_1").alias("combined_values_2"),
collect_set("scalar_2").alias("combined_values_2"))
dfOutput.show()
全く問題が修正され、単純な平坦化のUDFの最初のソリューション。スパークは、失敗する前に〜12時間かかってから30分で正常に仕事を完了するまで行った。 SparkモニタのGUIを見て、各内部タスクは1分以内に実行され、完了します。これに助けてくれてありがとう。 – clay
私は驚いていることを認めなければならないが、それを聞いてうれしい。私は小さな改善を期待しましたが、それほど印象的なものはありません。個々のリストの大きさはどれくらいですか? – zero323