2016-10-20 7 views
0

私はSparkとScalaの学習を始めました。リストの反復を並列化してSparkでRDDを作成する方法は?

collectを使用するのは悪いことですが、メモリ内のデータ全体が収集され、forを使用することも悪い習慣です。ブロック内のコードは複数のノードで同時に実行されないためです。

List(1,2,3,4,5,6,7,8,9,10)

と私は、この値を使用してRDDを生成する必要がこれらの値のそれぞれについて:

は今、私は1から10までの数字のリストを持っています。

この場合、RDDはどのように生成できますか?

sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)).map(number => generate_rdd(number)) 

を行うことにより、RDDは別のRDD内部で発生することができないので、私はエラーを取得します。

この問題を解決する最良の方法は何ですか?

+0

なぜ値ごとにrddを作成したいですか? rddは値/インスタンスの集合として定義されており、各値に対してインスタンスをrddとして作成したいとしますか? – hasan

+0

@ハサン私は、データ全体の異なるサブセットを使用する必要があります。各繰り返しでデータをフィルタリングするには数値が必要です – Vektor88

+0

次にmapの代わりに.filterを試してみてください。フィルタリングされた(正の)値のrddが返されます。 – hasan

答えて

1

のように定義すると、generate_rddmapの代わりにflatMapとなります。

sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)).flatMap(number => generate_rdd(number)) 

これは、作成したいRDDSの数が低く、従ってだろうと仮定すると、10

1

に1からの数字のために作成されたすべてのRDDSの連結があるRDDを与えるだろうその並列化自体をRDDで行う必要はないので、代わりにScalaの並列コレクションを使用することができます。たとえば、次のコードを使用して、約40個のHDFSファイルの行数を同時にカウントしようとしました[区切り文字の設定を無視します。改行区切りのテキストの場合、これはうまく] sc.textFileに置き換えられていることができます:ここで

val conf = new Configuration(sc.hadoopConfiguration) 
conf.set("textinputformat.record.delimiter", "~^~") 
val parSeq = List("path of file1.xsv","path of file2.xsv",...).par 
parSeq.map(x => { 
    val rdd = sc.newAPIHadoopFile(x, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) 
    println(rdd.count()) 
}) 

がスパークUIでの出力の一部です。このように、RDDカウント動作の大部分は同時に開始された。

Spark UI

関連する問題