2017-03-07 12 views
0

はコードを考えてみましょう:奇妙な挙動は

val df1 = spark.table("t1").filter(col("c1")=== lit(127)) 
val df2 = spark.sql("select x,y,z from ORCtable") 
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*), 
    trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter") 
df3.select($"y_R",$"z_R").show(500,false) 

これは警告WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.コードがjava.lang.OutOfMemoryError: GC overhead limit exceededを失敗を生産しています。

しかし、私は次のコードを実行した場合:

val df1 = spark.table("t1").filter(col("c1")=== lit(127)) 
val df2 = spark.sql("select x,y,z from ORCtable limit 2000000")//only difference here 
//ORC table has 1651343 rows so doesn't exceed limit 2000000 
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*), 
    trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter") 
df3.select($"y_R",$"z_R").show(500,false) 

これは正しい出力を生成します。私はこれがなぜ起こり、何が変わるのか、迷っている。誰かがこれを感知するのを助けることができますか?

答えて

2

私自身の質問に答えてください:physical execution planは同じdataframeを生成する2つの方法で異なり、.explain()メソッドを呼び出すことで確認できます。

第一の方法は、後者の方法は、典型的には遅いが、できるだけ多くのガベージコレクション株ないsort-merge joinを実行する一方java.lang.OutOfMemoryError: GC overhead limit exceededを引き起こすbroadcast-hash joinを使用します。

df2 dataframeの追加のfilter操作によって、この物理的な実行計画の違いが導入されています。