2016-09-15 12 views
0

別のRDDの配列の要素に基づいて、RDDの要素は、私は各要素が行列Aの単一の要素を表す、RDD、rdd1を有する取得:スパークスパークScalaの枠組みにおいて

val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}

xy、行を表す列と vがマトリックスAの値を表します。

Iはまた、各要素の配列は、その要素で表される特定indexに必要なrdd1に格納されているマトリックスAの要素の集合を表し、RDD[index, Array[(x, y)]]の形で別のRDD、rdd2を有します。

今、私は何をする必要があるか、index(x,y)vを含むすべてのデータを保持し、各indexためのマトリックスA要素の値を取得することです。これを行う際には、どのような良い方法がありますか?

答えて

1

私が正しく理解していれば、あなたの質問は、つまるところ:

val valuesRdd = sc.parallelize(Seq(
//((x, y), v) 
    ((0, 0), 5.5),    
    ((1, 0), 7.7) 
)) 

val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)]) 
    (123, Array((0, 0), (1, 0))) 
)) 

そして、あなたはすべての値(index, (x, y), v)、この場合は、(123, (0,0), 5.5)(123, (1,0), 7.7)を取得するために、これらのRDDSをマージしたいですか?

あなたは間違いなく、joinを使用してこれを行うことができ、両方のRDDSは、共通の列(x, y)を持っていますが、そのうちの一つは、実際にArray[(x, y)]を持っているので、あなたは、最初の行のセットの中にそれを爆発する必要があると思いますので、:

val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}} 
// Each row exploded into multiple rows (index, (x, y)) 

val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)} 
// Each row keyed by the coordinates (x, y) 

val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)} 
// Each row keyed by the coordinates (x, y) 

// Because we have common keys, we can join! 
val joined = keyedIndices.join(keyedValues) 
+0

ありがとうございました。 flatMapステートメントで使用されている '_'に関するエラーを発行します:'拡張された関数のタイプのパラメータがありません... ' – EdgeRover

+0

Ok。これは小さな修正を加えて動作しました: 'val explodedIndices = qual.flatMap {case(index、coords:Array [(Long、Long)])=> coords.map {case(x、y)=>(index、 y))}} '。ありがとうございました。 – EdgeRover

+0

素晴らしい!私は実際にそれを実行しようとしなかった答えを修正した。 – spiffman

関連する問題