2016-04-28 5 views
0

私は以下のような2つのデータセットを持っています。各データセットには、各行に「、」で区切られた数字があります。RDDの値を別の値に置き換える

データセット1

1,2,0,8,0

2,0,9,0,3

データセット2

7、 5,4,6,3

4,9,2,1,8

私はデータから対応する値に設定された第1のデータのゼロを交換する必要が

だから結果は、この

1,2,4,8のようになります。2.

を設定しました、2,9,9,1,3

3 Iは、以下のコードを使用して値を置き換えます。

val rdd1 = sc.textFile(dataset1).flatMap(l => l.split(",")) 
val rdd2 = sc.textFile(dataset2).flatMap(l => l.split(",")) 
val result = rdd1.zip(rdd2).map(x => if(x._1 == "0") x._2 else x._1) 

私が得た出力は、フォーマットRDD [文字列]です。しかし、出力形式はRDD [Array [String]]である必要があります。この形式は、それ以上の変換に適しているからです。

+0

あなたは 'valの結果= rdd1.zip(RDD2).MAP(X =>(x._1 == "0" の場合のような何かを探しています)配列(x._2)else配列(x._1)) '? @AlexisC。 –

+0

いいえ、rdd1とrdd2はRDD [Array [String]タイプです。あなたのコードのx._1は配列を参照しています – yAsH

+0

あなたのスニペットからはっきりしていません。 'RDD [String]'と 'x._1 ==" 0 "'を実行すると、分割後のフラット・マッピングです。どのように 'x._1'が配列を参照できるのでしょうか? 2行(各行に1つ)の 'RDD [Array [String]]が必要な場合を除きますか? –

答えて

2

RDD[Array[String]]を使用する場合は、配列の各要素が直線に対応する場合は、分割後に値をフラットにマップしないでください。

scala> val rdd1 = sc.parallelize(List("1,2,0,8,0", "2,0,9,0,3")).map(l => l.split(",")) 
rdd1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[1] at map at <console>:27 

scala> val rdd2 = sc.parallelize(List("7,5,4,6,3", "4,9,2,1,8")).map(l => l.split(",")) 
rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:27 

scala> val result = rdd1.zip(rdd2).map{case(arr1, arr2) => arr1.zip(arr2).map{case(v1, v2) => if(v1 == "0") v2 else v1}} 
result: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:31 

scala> result.collect 
res0: Array[Array[String]] = Array(Array(1, 2, 4, 8, 3), Array(2, 9, 9, 1, 3)) 

または多分それほど冗長:

val result = rdd1.zip(rdd2).map(t => t._1.zip(t._2).map(x => if(x._1 == "0") x._2 else x._1)) 
+0

私は上記の結果のためのしきい値を持つ別のRDDをArray(配列(6,100)、Array(5,100)、Array(7,100)、Array(0,100)、Array 1,100))。結果RDDの各配列の値がこれらのしきい値の間にあるかどうかを確認するにはどうすればよいですか? – yAsH

関連する問題