2015-09-10 11 views
6

私はRDD[String]wordRDDを持っています。また、文字列/単語からRDD [String]を作成する関数もあります。私はwordRDDの各文字列のための新しいRDD を作成したいと思います。ここに私の試みです:RDDからRDDのコレクションを作成する方法は?

1)スパークは、ネストされたRDDSサポートしていないため失敗しました:

var newRDD = wordRDD.map(word => { 
    // execute myFunction() 
    (new MyClass(word)).myFunction() 
}) 

2)おそらくスコープの問題に(失敗):?

var newRDD = sc.parallelize(new Array[String](0)) 
val wordArray = wordRDD.collect 
for (w <- wordArray){ 
    newRDD = sc.union(newRDD,(new MyClass(w)).myFunction()) 
} 

私の理想的な結果だろう次のようになります。

// input RDD (wordRDD) 
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...) 

// myFunction behavior 
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl') 

// after executing myFunction() on each word in wordRDD: 
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...) 

ここで関連する質問が見つかりました:Spark when union a lot of RDD throws stack overflow error問題。

答えて

3

使用flatMapRDD[String]を取得します。

var allWords = wordRDD.flatMap { word => 
    (new MyClass(word)).myFunction().collect() 
} 
+1

これはどのように並行して実行する必要がありますか? 'wordRDD.map'の中で起こることは全て、クラスタ上で実行されます。したがって、内部の 'collect'は、実行中のジョブの中から新しいSparkジョブを起動する必要があります。私はそれが分散して動かないと思う。 –

+0

彼はまた、RDDの代わりに配列を返すように関数を変更することもできますが、実際の関数を指定することはできません。 –

+0

しかし、彼の説明は、彼は、機能を持っている私はそれが 'RDD [文字列]'から文字列/単語を作成し 'myFunction'だ前提としていたという。 –

3

あなたは別のRDD内からRDDを作成することはできません。

しかし、RDD内から使用することができるような他の機能modifiedFunction: String => Seq[String]に、一文字が除去された入力からの全ての単語を生成する、あなたの関数myFunction: String => RDD[String]を書き換えることが可能です。そうすれば、それはあなたのクラスタ上で並行して実行されます。 modifiedFunctionあなたは、単にwordRDD.flatMap(modifiedFunction)を呼び出すことにより、すべての単語と、最終的なRDDを得ることができました。

重要なポイントは、flatMapmapへとflatten変換)を使用することです:あなたが望むよう

def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 

    val input = sc.parallelize(Seq("apple", "ananas", "banana")) 

    // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...) 
    val result = input.flatMap(modifiedFunction) 
} 

def modifiedFunction(word: String): Seq[String] = { 
    word.indices map { 
    index => word.substring(0, index) + word.substring(index+1) 
    } 
} 
関連する問題