2017-01-08 8 views
0

ScalaとSparkでまったく新鮮! 一連の(String, (Double, Double, Int))データをk-vアレイとして得た。今このデータのgroupByKey()メソッドは、いくつかの(String, Seq[(Double, Double, Int)])グループを取得します。 最初の大きなグループに入り、Seqパートをフェッチして次の大きなグループに移動するにはどうしたらいいですか? 言ってやれば、Spark RDD全体で要素のすべての要素と要素のメンバーを反復処理するにはどうすればよいですか?

("id1", [(1.1,2.2,3), (4.4,5.5,6), (7.7,8.8,9)]), 
("id2", [(10.10,11.11,12), (13.13,14.14,15)]) 

私の記憶にあります。私は "id1"を掘り下げて3つのデータを繰り返し見て、さらにいくつかの変更を加えます。それから私は "id2"にジャンプし、データを繰り返します。 どのようにコードするのですか? > _ <

答えて

3

RDDが(String, Seq[(Double, Double, Int)])の場合は、標準マップで繰り返し処理できます。

val data: RDD[(String, Seq[(Double, Double, Int)])] = _ //Your RDD Here 
data.map { 
    case (key, value) => 
    value.map { 
     case (first, second, third) => first * second * third 
    } 
    } 

これはおそらく、あなたのデータを構造化するかなり扱いにくい方法であると私はいくつかの他の方法でDataframeしたり、データを構造化を検討します。

http://spark.apache.org/docs/latest/sql-programming-guide.htmlのデータフレーム/データセットについての情報がありますが、これはあなたの問題に適していて、快適でない場合はマップよりもSQLのようなステートメントを書くことができます。ここで

が完了し、汚れた例のようなもの

import org.apache.spark._ 

object SparkExample extends App { 

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App") 
    val sess: SparkContext = new SparkContext(conf) 

    val data: Seq[(String, Seq[(Double, Double, Int)])] = Seq[(String, Seq[(Double, Double, Int)])](
    ("id1", Seq[(Double, Double, Int)]((1.1, 2.2, 3), (4.4, 5.5, 6), (7.7, 8.8, 9))), 
    ("id2", Seq[(Double, Double, Int)]((10.10, 11.11, 12), (13.13, 14.14, 15))) 
) 

    val rdd: RDD[(String, Seq[(Double, Double, Int)])] = sess.parallelize(data) 

    val d: Array[Seq[Double]] = rdd.map { 
    case (key, value) => value.map { 
     case (first, second, third) => first + second + third 
    } 
    }.collect() 
    println(d.mkString(", ")) 
} 
+0

うわー感謝です!実際に私はすぐに私の小さな問題を行うためにデータセットを使用しようとしています。お返事をありがとうございます! – Smallthing

+0

それを受け入れられた答えとしてマークすることができれば、それは素晴らしいでしょう –

関連する問題