0
下記のように3フィールドのrddがあります。1フィールドrddの値の選択方法rddの第2フィールドにある場合のみ
1,2,6
2,4,6
1,4,9
3,4,7
2,3,8
今、上記のrddから、私はrddにしたがいます。
2,4,6
3,4,7
2,3,8
結果rddには1から始まる行がありません.1は入力rddの2番目のフィールドにはありません。
下記のように3フィールドのrddがあります。1フィールドrddの値の選択方法rddの第2フィールドにある場合のみ
1,2,6
2,4,6
1,4,9
3,4,7
2,3,8
今、上記のrddから、私はrddにしたがいます。
2,4,6
3,4,7
2,3,8
結果rddには1から始まる行がありません.1は入力rddの2番目のフィールドにはありません。
[OK]を、私はあなたが何をしたいのかを正しく理解している場合、2つの方法があります。
ありRDDがあるあなたのRDD
最初RDD「は、第2フィールド」と第二の一意の値が含まれている2に分割「第1の値」をキーとする。その後、rddsを一緒に結合します。このアプローチの欠点は、distinct
とjoin
が動作が遅いことです。
val r: RDD[(String, String, Int)] = sc.parallelize(Seq(
("1", "2", 6),
("2", "4", 6),
("1", "4", 9),
("3", "4", 7),
("2", "3", 8)
))
val uniqueValues: RDD[(String, Unit)] = r.map(x => x._2 ->()).distinct
val r1: RDD[(String, (String, String, Int))] = r.map(x => x._1 -> x)
val result: RDD[(String, String, Int)] = r1.join(uniqueValues).map {case (_, (x, _)) => x}
result.collect.foreach(println)
あなたのRDDは比較的小さく、二値のSet
はその後、その後、あなたはメモリ内の最初のステップとして設定されているを作成することができ、すべてのノードでメモリに完全に収まるすべてのノードにブロードキャストすることができた場合ちょうどあなたのRDDをフィルタリング:
val r: RDD[(String, String, Int)] = sc.parallelize(Seq(
("1", "2", 6),
("2", "4", 6),
("1", "4", 9),
("3", "4", 7),
("2", "3", 8)
))
val uniqueValues = sc.broadcast(r.map(x => x._2).distinct.collect.toSet)
val result: RDD[(String, String, Int)] = r.filter(x => uniqueValues.value.contains(x._1))
result.collect.foreach(println)
どちらの例の出力を:
(2,4,6)
(2,3,8)
(3,4,7)
あなたは番目を提供することができますeフルタイプの入出力RDDを作成し、データをどのようにフィルタリングして変換するかについての規則を詳述します。 – Aivean
フィールドとフィールド2は文字列、フィールド3は整数です。 Field1の値がField2で使用可能な出力の行だけが必要です。上記の例では、2と3はrddのField2にありますが、1はField2にありません。 – Ahmad
あなたの質問をより良い説明やより良い例で更新する必要があります。 – Vishnu667