wholeTextFiles
を使用してディレクトリの各ファイルを読み取っています。その後、私はmap
を使ってrddの各要素の関数を呼び出しています。プログラム全体は、各ファイルのわずか50行を使用します。コードは以下の通りです:apache spark:ディレクトリから大容量のファイルを読み取る
def processFiles(fileNameContentsPair):
fileName= fileNameContentsPair[0]
result = "\n\n"+fileName
resultEr = "\n\n"+fileName
input = StringIO.StringIO(fileNameContentsPair[1])
reader = csv.reader(input,strict=True)
try:
i=0
for row in reader:
if i==50:
break
// do some processing and get result string
i=i+1
except csv.Error as e:
resultEr = resultEr +"error occured\n\n"
return resultEr
return result
if __name__ == "__main__":
inputFile = sys.argv[1]
outputFile = sys.argv[2]
sc = SparkContext(appName = "SomeApp")
resultRDD = sc.wholeTextFiles(inputFile).map(processFiles)
resultRDD.saveAsTextFile(outputFile)
ディレクトリの各ファイルのサイズは、私の場合は非常に大きくすることができ、このような理由によりwholeTextFiles
APIの使用は、この場合には非効率的になります。これを行う効率的な方法はありますか?ディレクトリの各ファイルを1つずつ繰り返して考えることもできますが、それも効率が悪いようです。私はスパークするために新しいです。これを行う効率的な方法があるかどうか教えてください。
各ファイルのサイズはどのくらいですか?ファイルをさらに小さなファイルに分割できませんか? –
@DatTran各ファイルのサイズは数Gbsにすることができ、ディレクトリ内のファイルの数は100以上にすることができます。ファイルを分割すると思いますが、各ファイルを1つずつ分割し、一時ディレクトリに保存します。その後、そのtempディレクトリに 'wholeTextFiles'を適用することができます。これはあなたがファイルを分割する方法ですか?そうでない場合は、ファイルを分割する方法を教えてください。 – mcurious