RDDが(String, Seq[(Double, Double, Int)])
の場合は、標準マップで繰り返し処理できます。
val data: RDD[(String, Seq[(Double, Double, Int)])] = _ //Your RDD Here
data.map {
case (key, value) =>
value.map {
case (first, second, third) => first * second * third
}
}
これはおそらく、あなたのデータを構造化するかなり扱いにくい方法であると私はいくつかの他の方法でDataframe
したり、データを構造化を検討します。
http://spark.apache.org/docs/latest/sql-programming-guide.htmlのデータフレーム/データセットについての情報がありますが、これはあなたの問題に適していて、快適でない場合はマップよりもSQLのようなステートメントを書くことができます。ここで
が完了し、汚れた例のようなもの
import org.apache.spark._
object SparkExample extends App {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App")
val sess: SparkContext = new SparkContext(conf)
val data: Seq[(String, Seq[(Double, Double, Int)])] = Seq[(String, Seq[(Double, Double, Int)])](
("id1", Seq[(Double, Double, Int)]((1.1, 2.2, 3), (4.4, 5.5, 6), (7.7, 8.8, 9))),
("id2", Seq[(Double, Double, Int)]((10.10, 11.11, 12), (13.13, 14.14, 15)))
)
val rdd: RDD[(String, Seq[(Double, Double, Int)])] = sess.parallelize(data)
val d: Array[Seq[Double]] = rdd.map {
case (key, value) => value.map {
case (first, second, third) => first + second + third
}
}.collect()
println(d.mkString(", "))
}
うわー感謝です!実際に私はすぐに私の小さな問題を行うためにデータセットを使用しようとしています。お返事をありがとうございます! – Smallthing
それを受け入れられた答えとしてマークすることができれば、それは素晴らしいでしょう –