私が正しく理解していれば、あなたの質問は、つまるところ:
val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
((0, 0), 5.5),
((1, 0), 7.7)
))
val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
(123, Array((0, 0), (1, 0)))
))
そして、あなたはすべての値(index, (x, y), v)
、この場合は、(123, (0,0), 5.5)
と(123, (1,0), 7.7)
を取得するために、これらのRDDSをマージしたいですか?
あなたは間違いなく、join
を使用してこれを行うことができ、両方のRDDSは、共通の列(x, y)
を持っていますが、そのうちの一つは、実際にArray[(x, y)]
を持っているので、あなたは、最初の行のセットの中にそれを爆発する必要があると思いますので、:
val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))
val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)
val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)
// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)
ありがとうございました。 flatMapステートメントで使用されている '_'に関するエラーを発行します:'拡張された関数のタイプのパラメータがありません... ' – EdgeRover
Ok。これは小さな修正を加えて動作しました: 'val explodedIndices = qual.flatMap {case(index、coords:Array [(Long、Long)])=> coords.map {case(x、y)=>(index、 y))}} '。ありがとうございました。 – EdgeRover
素晴らしい!私は実際にそれを実行しようとしなかった答えを修正した。 – spiffman