val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
((0, 0), 5.5),
((1, 0), 7.7)
val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
(123, Array((0, 0), (1, 0)))
そして、あなたはすべての値(index, (x, y), v)
、この場合は、(123, (0,0), 5.5)
と(123, (1,0), 7.7)
を使用してこれを行うことができ、両方のRDDSは、共通の列(x, y)
を持っていますが、そのうちの一つは、実際にArray[(x, y)]
val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))
val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)
val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)
// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)
ありがとうございました。 flatMapステートメントで使用されている '_'に関するエラーを発行します:'拡張された関数のタイプのパラメータがありません... ' – EdgeRover
Ok。これは小さな修正を加えて動作しました: 'val explodedIndices = qual.flatMap {case(index、coords:Array [(Long、Long)])=> coords.map {case(x、y)=>(index、 y))}} '。ありがとうございました。 – EdgeRover
素晴らしい!私は実際にそれを実行しようとしなかった答えを修正した。 – spiffman