2016-06-21 7 views
2

クラスタ上でsparkアプリケーションを実行しています。私は、RDD内の各要素に対して何らかの操作を行い、各要素をテキストファイルに保存したいと考えています。RDDの要素をSparkアプリケーションに保存する

私はしかし、私はprint文は、すべてのエラー/警告なしで印刷されることを見つけるんがsomefile.txtを見つけることができないのですmyRDD

myRDD.foreach(process) 

    def process(elements): 
     // some operation that extracts the strings 
     // and converts to myList 
     myList = ... 

     with open("somefile.txt", "a+") as myfile: 
      print "----SAVED----" 
      myfile.writelines(myList) 

foreachを呼び出しています。 somefile.txtはどこに保存されていますか?私の方法が間違っている場合、どのようにRDDの個々の要素を保存するのですか?

+0

foreachを使用してmyListを保存しますか?可能であればどうすればRDD上の各要素の操作を行い、リストの形で生成された出力を保存するのですか? – vkb

答えて

2
myRDD.map(convertToList).saveAsTextFile(<hdfs output path>) 

あなたは、ドライバにすべてのデータを転送する必要がある場合、その後、あなたは、出力データは、ドライバメモリに収まるように十分に小さいであることを保証がするかでしょう、あなたのアプリケーションを拡張することができるようになりますトラブルが始まります。

あなたは1つだけのファイル(このアプローチはスケーラブルで、ドライバーにすべての出力を転送するよりも、同様の問題を持っていない)でそのすべてのデータの終了が必要な場合:あなたを変換する必要がある場合

myRDD.map(generateList).coalesce(1).saveAsTextFile(<hdfs output path>) 

をその前に、文字列内のリストには、ファイルに保存されている:

myRDD.map(generateList).map(listToString).saveAsTextFile(<hdfs output path>) 

はもちろん、あなたが最初のマップの中の文字列にあなたのリストを変換し、余分なステップを保存することができます。

2

これが問題である理由は、RDDが必ずしも単一のノードにあるわけではないからです。 foreachに電話をかけたときのRDDは、ノード間で分散されます。あなたは良いかもしれないcollectまたは

collectuse the built in file writer, but this won't modify it.を使用してドライバノードへのごRDDを収集するために、いずれかの必要があるが、今、すべてのデータを単一のノードに収集されているので、それはまた、ボトルネック(ドライバー・ノード。)

編集:私は効率的でありながら、あなたが欲しいものを行う必要がありますいくつかのコードを使用して、追加の質問...

def process(element): 
    #process element to a list 
    return myList 

def writeList(myList): 
    with open('somefile.txt', 'a+') as f: 
    f.writelines(myList) 

#in main 
myListRDD = myRDD.map(process) 
myListRDD.collect().foreach(writeList) 

にお答えするつもりです。リストのための新しいRDDを処理するので、すべての処理を並行して行うことができるので、データの一貫性のために単一のノードで実行する必要があるファイル書き込みが唯一の線形操作です。このアプローチで

+0

year、ドライバノードにmyListを戻すにはどうすればいいですか? – vkb

+0

@vkbあなたは 'val myRDDArr = myRDD.collect()'を実行するだけで、すべてのRDDデータをドライバノード上の 'myRDDArr'に格納された配列にプルします。 –

+0

各要素は関数 'process'に渡されます。この関数では、リストを返す操作が実行されます。私の最終目標は、リストをテキストファイルに保存することです。あなたはmyRDD.foreach(process).collect()を意味しますか? – vkb

関連する問題