2016-06-26 11 views
2

HDFSとローカルに保存する必要がある中間データがあります。私はスパーク1.6を使用しています。中間形式のHDFSでは、データは/output/testDummy/part-00000/output/testDummy/part-00001になります。私は/users/home/indexes/index.nt(両方ともローカルでマージ)または/users/home/indexes/index-0000.nt/home/indexes/index-0001.ntとして保存できるように、これらのパーティションをJava/Scalaを使ってローカルに保存したいと思います。イテレータでmapPartitionを使用してスパークRDDを保存する

ここに私のコードです: 注:testDummyはtestと同じですが、出力は2つのパーティションです。私はそれらを別々に保存するか、またはローカルに組み合わせて、index.ntファイルと一緒に保存したいと思います。私は別々に2つのデータノードに格納することを好みます。私はクラスタを使用しており、YARNでスパークジョブを送信しています。私はまた、いくつかのコメント、何回、何のデータを取得しているのかを追加しました。どうすればいいですか?どんな助けもありがとうございます。

val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy") 
println("testDummy done") //1 time print 

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = { 
    println("Inside savesData")         // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2 
    println("iter size"+iterator.size)       // 2 735 2 735 values 
    val filenamesWithExtension = outputPath + "/index.nt" 
    println("filenamesWithExtension "+filenamesWithExtension.length) //4 times 
    var list = List[(String)]() 

    val fileWritter = new FileWriter(filenamesWithExtension,true) 
    val bufferWritter = new BufferedWriter(fileWritter) 

    while (iterator.hasNext){      //iterator.hasNext is false 
     println("inside iterator")     //0 times 
     val dat = iterator.next() 
     println("datadata "+iterator.next()) 

     bufferWritter.write(dat + "\n") 
     bufferWritter.flush() 
     println("index files written") 

     val dataElements = dat.split(" ") 
     println("dataElements")         //0 
     list = list.::(dataElements(0)) 
     list = list.::(dataElements(1)) 
     list = list.::(dataElements(2)) 
    } 
    bufferWritter.close() //closing 
    println("savesData method end")       //4 times when coal=2 
    list.iterator 
} 

println("before saving data into local")        //1 
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData) 
println("testRDD partitions "+test.getNumPartitions)        //2 
println("testRDD size "+test.collect().length)        //0 
println("after saving data into local") //1 

PS:私はthisthis、続いなく、まったく同じ私が探しているものを、私は何とかやったけどindex.nt

+2

Scalaは 'list::(dataElements(2))'を 'dataElements(2):: list'と同等にすることで、世界をもう少し良くしていますので、Scalaを悲しくしないでください。少なくともメソッドがオペレータのようなものである場合ちなみに、 'ListBuffer'はおそらくもっと適切でしょう – Dici

答えて

5

で物事のカップルを何も得ていない:

  • あとでデータを使用する場合は、Iterator.sizeに電話をかけないでください。 IteratorsTraversableOnceです。 Iteratorを計算する唯一の方法は、すべての要素をトラバースすることです。その後は、読み取るデータがなくなります。
  • 副作用としてmapPartitionsのような変換を使用しないでください。 foreach/foreachPartitionのような種類のIO使用アクションを実行する場合。悪い習慣であり、与えられたコードが一度だけ実行されることを保証するものではありません。
  • アクションまたは変換内のローカルパスは、特定のワーカーのローカルパスです。クライアントマシンに直接書き込む場合は、まずデータをcollectまたはtoLocalIteratorでフェッチする必要があります。分散ストレージに書き込んで後でデータをフェッチする方が良いかもしれません。
0

Java 7は、ディレクトリを監視する手段を提供します。

https://docs.oracle.com/javase/tutorial/essential/io/notification.html

アイデアは、時計のサービスを作成し、関心のディレクトリ(ファイルの作成、削除、など、のようなあなたの興味のあるイベントを、言及)に登録することで、見てください、あなたは次のようになります作成、削除などのイベントが通知されたら、必要な操作を実行することができます。

適用可能な場合は、Java hdfs apiに大きく依存する必要があります。

イベントを永遠に待つため、バックグラウンドでプログラムを実行します。 (あなたが望むことをした後でロジックを書くことができます)

一方、シェルスクリプトも役に立ちます。

ファイルの読み取り中にhdfsファイルシステムのコヒーレンシモデルに注意してください。

これはいくつかの考えに役立ちます。

関連する問題