私はRDD[String]
、wordRDD
を持っています。また、文字列/単語からRDD [String]を作成する関数もあります。私はwordRDD
の各文字列のための新しいRDD を作成したいと思います。ここに私の試みです:RDDからRDDのコレクションを作成する方法は?
1)スパークは、ネストされたRDDSサポートしていないため失敗しました:
var newRDD = wordRDD.map(word => {
// execute myFunction()
(new MyClass(word)).myFunction()
})
2)おそらくスコープの問題に(失敗):?
var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}
私の理想的な結果だろう次のようになります。
// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)
// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')
// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)
ここで関連する質問が見つかりました:Spark when union a lot of RDD throws stack overflow error問題。
これはどのように並行して実行する必要がありますか? 'wordRDD.map'の中で起こることは全て、クラスタ上で実行されます。したがって、内部の 'collect'は、実行中のジョブの中から新しいSparkジョブを起動する必要があります。私はそれが分散して動かないと思う。 –
彼はまた、RDDの代わりに配列を返すように関数を変更することもできますが、実際の関数を指定することはできません。 –
しかし、彼の説明は、彼は、機能を持っている私はそれが 'RDD [文字列]'から文字列/単語を作成し 'myFunction'だ前提としていたという。 –