HDFSとローカルに保存する必要がある中間データがあります。私はスパーク1.6を使用しています。中間形式のHDFSでは、データは/output/testDummy/part-00000
と/output/testDummy/part-00001
になります。私は/users/home/indexes/index.nt
(両方ともローカルでマージ)または/users/home/indexes/index-0000.nt
と/home/indexes/index-0001.nt
として保存できるように、これらのパーティションをJava/Scalaを使ってローカルに保存したいと思います。イテレータでmapPartitionを使用してスパークRDDを保存する
ここに私のコードです: 注:testDummyはtestと同じですが、出力は2つのパーティションです。私はそれらを別々に保存するか、またはローカルに組み合わせて、index.nt
ファイルと一緒に保存したいと思います。私は別々に2つのデータノードに格納することを好みます。私はクラスタを使用しており、YARNでスパークジョブを送信しています。私はまた、いくつかのコメント、何回、何のデータを取得しているのかを追加しました。どうすればいいですか?どんな助けもありがとうございます。
val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
println("testDummy done") //1 time print
def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
println("iter size"+iterator.size) // 2 735 2 735 values
val filenamesWithExtension = outputPath + "/index.nt"
println("filenamesWithExtension "+filenamesWithExtension.length) //4 times
var list = List[(String)]()
val fileWritter = new FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)
while (iterator.hasNext){ //iterator.hasNext is false
println("inside iterator") //0 times
val dat = iterator.next()
println("datadata "+iterator.next())
bufferWritter.write(dat + "\n")
bufferWritter.flush()
println("index files written")
val dataElements = dat.split(" ")
println("dataElements") //0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2))
}
bufferWritter.close() //closing
println("savesData method end") //4 times when coal=2
list.iterator
}
println("before saving data into local") //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions) //2
println("testRDD size "+test.collect().length) //0
println("after saving data into local") //1
PS:私はthisとthis、続いなく、まったく同じ私が探しているものを、私は何とかやったけどindex.nt
Scalaは 'list::(dataElements(2))'を 'dataElements(2):: list'と同等にすることで、世界をもう少し良くしていますので、Scalaを悲しくしないでください。少なくともメソッドがオペレータのようなものである場合ちなみに、 'ListBuffer'はおそらくもっと適切でしょう – Dici