2016-08-24 15 views
2

私はハイブでクエリを実行しようとしています:ここApacheのスパーク - ハイブ内部結合、LIMITおよびカスタムUDF

は(私はより多くを行うカスタムUDFを使用して=が、イムを行うことができます知っている最も簡単な設定ですAおよびB 30,000行の周りcustom_UDF_Equals_Comparisonは単にa.id = b.id

との間の等価性チェックを行い、各

SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5

ある)だけ等価比較より

データセット

私はこのクエリを実行すると、多くのm/rタスクが実行されていることをログ出力で確認することができますが、すべての可能な順列が比較され、5のLIMITをはるかに上回るまで私はほとんどのデータが各テーブルの最初の数行に参加できることを知っているので、いくつかのm/rタスクしか期待していません)、なぜこれが起こりますか?どのように修正することができますか?

編集:比較のためにUDFを使用する際に2 RDDの間の完全な比較が行われている理由

ハイテクzero323が、それは同様の問題であるが、正確ではありませんが、それは説明していますが、LIMITが停止しない理由、それは説明のdoesnt限界値5が見つかった場合の比較。たとえば、最初の10回の参加試行で5行が見つかった場合、それは残りの30,000回* 30,000回の試行の間に移動します。すべての結合が行われた後に制限が適用されるためですか?例えばそれは30,000 * 30,000の行に参加し、それらをわずか5に減らしますか?

EDIT2:join_views( "Fuzzy_String"、 "0.1"、a.col1、b.col1)LIMIT 5 ON

def levenshtein(str1: String, str2: String): Int = { 
val lenStr1 = str1.length 
val lenStr2 = str2.length 

val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1) 

for (i <- 0 to lenStr1) d(i)(0) = i 
for (j <- 0 to lenStr2) d(0)(j) = j 

for (i <- 1 to lenStr1; j <- 1 to lenStr2) { 
    val cost = if (str1(i - 1) == str2(j-1)) 0 else 1 

    d(i)(j) = min(
    d(i-1)(j ) + 1,  // deletion 
    d(i )(j-1) + 1,  // insertion 
    d(i-1)(j-1) + cost // substitution 
) 
} 

d(lenStr1)(lenStr2) 

}

def min(nums: Int*): Int = nums.min 

def join_views(joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = { 
if (joinType == "Equals") { 
    if (col1 == null || col2 == null) { 
    return false 
    } 

    return col1 == col2 
} 
else if (joinType == "Fuzzy_String") { 
    if (col1 == null || col2 == null) { 
    return false 
    } 

    val val1 = col1.asInstanceOf[String] 
    val val2 = col2.asInstanceOf[String] 

    val ratio = Utils.distancePercentage(val1, val2) 

    if (ratio == 1.0) { 
    return val1 == val2 
    } 

    return (ratio >= parameters.asInstanceOf[Double]) 
} 

return false; 

}

... = 20秒

... ON join_views( "Fuzzy_String"、 "0.9"、a.col1、b.col1)LIMIT 5 = 100secs

+0

それは閉じ滞在することができ、助け –

+0

のためのおかげで、私は実際に私のcustom_UDFもファジーチェックを行い、当惑させる何かを発見し、私は0.1のファジー値と一致しなければならないことを実行すると、それは非常に迅速に戻り合流結果は(例えば5行に非常に速くマッチして戻ります)、0.9に設定するとUDFの元の=比較と似ています。なぜ0.1のファジーマッチがより速く戻るのだろうか?必ずしも質問ではなく、ただの観測 –

+0

これは実際に興味深い。実装の詳細を分かち合うことができますか? – zero323

答えて

1

だから、三つの異なる問題がここにあります

  • スパークは、最適化ハッシュを使用し、これらの最適化は唯一の等結合に適用可能であるので、ソートすることによって結合します。 UDFに依存するジョインを含む他のタイプのジョインは、ペアワイズ比較、したがってデカルト積を必要とします。詳細はWhy using a UDF in a SQL query leads to cartesian product?をご覧ください。
  • データ移動後の操作、特にシャッフルを完全に最適化することはできません。 Sun Ruiによって提供されたnice answerTowards limiting the big RDDの素晴らしい説明を見つけることができます。

    あなたのケースはシャッフルの不足のために逆説的に簡単ですが、パーティションを一緒に持っていく必要があります。

  • 制限の最適化はレコードではなくパーティションに基づいています。 Sparkが最初のパーティションをチェックし、条件を満たしている要素の数が必要以上に少ない場合は、反復ごとにパーティションの数を増やして反復します(ただし、この要素が4であることがわかっている限り)。あなたがまれなイベントを探しているなら、これはかなり高速になります。

関連する問題