2016-10-14 8 views
1

私はSparkとScalaで作業しています。私はArray[String]のRDDを持っています。 RDDには、(name, age, work, ...)のような属性の値が含まれています。私は、各属性のすべての一意の値を収集するために、一連の可変文字列(attributes)を使用しています。このようなものとして、RDDのSpark RDDから一意の値を持つScala Seqの集合を埋める方法は?

思う:

val someLength = 10 
val attributes = Seq.fill[mutable.Set[String]](someLength)(mutable.Set()) 
val splitLines = rdd.map(line => line.split("\t")) 

lines.foreach(line => { 
    for {(value, index) <- line.zipWithIndex} { 
    attributes(index).add(value) 
    // #1 
    } 
}) 

// #2 
:私は、次のコードを持っている

attributes = (("name1","name2","name3"),("21","22"),("JobA","JobB")) 

:最後に

("name1","21","JobA") 
("name2","21","JobB") 
("name3","22","JobA") 

私はこのような何かをしたいです

私がデバッグしてlinで停止するとe #1と表示されていますが、すべて正常です。attributesは一意の値で正しく埋められています。

ループの後、行#2で、属性が再び空です。その属性がセットのシーケンスで、その中に示しみると、大きさの全て0

Seq() 
Seq() 
... 

であること私が間違って何をしているのですか?私は気付いていない、何らかのスコープが続いていますか?

+0

専門的な好奇心からの質問:今Sparkを学習しているのなら、なぜRDD APIを使用していますか?なぜデータセット/データフレームではないのですか? – maasg

+0

@maasg私はOPではありませんが、Sparkで始めたばかりの場合は、抽象度の低いところから始めてDataSets/Framesに向かいます。技術的にはこれは抽象化レベルを「上がる」わけではありませんが、生のRDDに行かない限り、あなたができないことはまだあります。 –

+0

@maasg私はYuvalに同意します。私はDataSets/DataFramesと比較して、SparkをRDDで学習する方が簡単で有益であると感じています。 –

答えて

1

答えは、スパークが分散エンジンであるという事実にあります。あなたが直面している問題の大まかなアイデアをお伝えします。ここで、各RDDの要素はPartitionsにバケットで入れられ、それぞれPartitionは異なるノードに潜在的に存在します。

rdd1.foreach(f)と書くと、fはクロージャー内にラップされます(対応するオブジェクトのコピーを取得します)。今度は、このクロージャーがシリアル化され、各ノードに送信され、そこで各要素に対して適用されます(Partition)。ここで

、あなたのfは、そのラップ閉鎖にattributescopyを取得しますので、fが実行されたとき、それはあなたが望むattributesattributesやないの、そのコピーと相互作用。その結果、attributesはそのまま残ります。

私はこの問題がはっきりしていることを願っています。

val yourRdd = sc.parallelize(List(
    ("name1","21","JobA"), 
    ("name2","21","JobB"), 
    ("name3","22","JobA") 
)) 

val yourNeededRdd = yourRdd 
    .flatMap({ case (name, age, work) => List(("name", name), ("age", age), ("work", work)) }) 
    .groupBy({ case (attrName, attrVal) => attrName }) 
    .map({ case (attrName, group) => (attrName, group.toList.map(_._2).distinct }) 

// RDD(
//  ("name", List("name1", "name2", "name3")), 
//  ("age", List("21", "22")), 
//  ("work", List("JobA", "JobB")) 
//) 

// Or 

val distinctNamesRdd = yourRdd.map(_._1).distinct 
// RDD("name1", "name2", "name3") 

val distinctAgesRdd = yourRdd.map(_._2).distinct 
// RDD("21", "22") 

val distinctWorksRdd = yourRdd.map(_._3).distinct 
// RDD("JobA", "JobB") 
+0

あなたの説明をありがとう、私はまだスパークを学んでいるので、これは多くの助けになりました。私の問題は今はっきりしていますが、まだ解決策はありません。私はスパーク変換/アクションだけで私の望む結果を達成しようとするかもしれません。 – robinki

+0

あなたの要件を解決する方法はほとんどありません。 –

+0

あなたのソリューションのアプローチに感謝します。これは、特に私が最初にファイルから属性名を読み込み、ハードコードしたくないために、可変数の属性でより複雑になりました。 – robinki

関連する問題