0

Apache Sparkジョブを実行するときに直面した問題の1つは、RDDの各要素を互いに掛け合わせることです。 だけで、私はこれに似た何かをしたい、スパークRDDの要素を互いに掛け合わせる

enter image description here

を置く現在、私は、各「foreachの」の2つのイテレータを使用して、これをやっています。私の直感は、これが非常に効率的なやり方でできるということです。

for (elementOutSide <- iteratorA) { 
    for (elementInside <- iteratorB) { 
    if (!elementOutSide.get(3).equals(elementInside.get(3))) { 
     val multemp = elementInside.getLong(3) * elementOutSide.getLong(3) 
     .... 
     ... 

}}} 

状況を改善し改善するのにお手伝いできますか?前もって感謝します .. !!

+0

私はあなたが通常のデカルト結合を探していると思います。 – Alec

+1

ところで、あなたの実装は実際には要件に適合しません。元のRDDのレコードが_unique_の場合にのみ動作する、実際の_elements_と_indices_は比較されません。 –

+0

これらはユニークで、RDDはそれを保証するSQLクエリを使用して構築されます。 – Infamous

答えて

1

コメントで指摘されているように、これはデカルト結合です。ここでは、我々は2つずつの非同一Int Sの乗算に興味RDD[(Int, String)]、上で行うことができる方法は次のとおりです。

val rdd: RDD[(Int, String)] = sc.parallelize(Seq(
    (1, "aa"), 
    (2, "ab"), 
    (3, "ac") 
)) 

// use "cartesian", then "collect" to map only relevant results 
val result: RDD[Int] = rdd.cartesian(rdd).collect { 
    case ((t1: Int, _), (t2: Int, _)) if t1 != t2 => t1 * t2 
} 

注:この実装は、指示通りに入力レコードが、一意である前提としています。一致しない場合は、値の代わりにインデックスを比較しながら、デカルト結合とrdd.zipWithIndexの結果のマッピングを実行できます。

関連する問題