2017-05-08 18 views
0

wholeTextFilesを使用してディレクトリの各ファイルを読み取っています。その後、私はmapを使ってrddの各要素の関数を呼び出しています。プログラム全体は、各ファイルのわずか50行を使用します。コードは以下の通りです:apache spark:ディレクトリから大容量のファイルを読み取る

def processFiles(fileNameContentsPair): 
    fileName= fileNameContentsPair[0] 
    result = "\n\n"+fileName 
    resultEr = "\n\n"+fileName 
    input = StringIO.StringIO(fileNameContentsPair[1]) 
    reader = csv.reader(input,strict=True) 

    try: 
     i=0 
     for row in reader: 
     if i==50: 
      break 
     // do some processing and get result string 
     i=i+1 
    except csv.Error as e: 
    resultEr = resultEr +"error occured\n\n" 
    return resultEr 
    return result 



if __name__ == "__main__": 
    inputFile = sys.argv[1] 
    outputFile = sys.argv[2] 
    sc = SparkContext(appName = "SomeApp") 
    resultRDD = sc.wholeTextFiles(inputFile).map(processFiles) 
    resultRDD.saveAsTextFile(outputFile) 

ディレクトリの各ファイルのサイズは、私の場合は非常に大きくすることができ、このような理由によりwholeTextFiles APIの使用は、この場合には非効率的になります。これを行う効率的な方法はありますか?ディレクトリの各ファイルを1つずつ繰り返して考えることもできますが、それも効率が悪いようです。私はスパークするために新しいです。これを行う効率的な方法があるかどうか教えてください。

+1

各ファイルのサイズはどのくらいですか?ファイルをさらに小さなファイルに分割できませんか? –

+0

@DatTran各ファイルのサイズは数Gbsにすることができ、ディレクトリ内のファイルの数は100以上にすることができます。ファイルを分割すると思いますが、各ファイルを1つずつ分割し、一時ディレクトリに保存します。その後、そのtempディレクトリに 'wholeTextFiles'を適用することができます。これはあなたがファイルを分割する方法ですか?そうでない場合は、ファイルを分割する方法を教えてください。 – mcurious

答えて

1

私が提案しているのは、まずファイルを小さなチャンクに分割することですが、いくつかのGbsは読み込みには大きすぎるため、遅延の主な原因です。あなたのデータがHDFS上にある場合、ファイルごとに64MBのようなものがあります。それ以外の場合は、ファイルのサイズを調べる必要があります。これは、実行しているエグゼキュータの数によって異なります。したがって、より小さなチャンクがあれば、これを増やしてより並列性を高めることができます。同様に、processFiles関数がCPUを集中的に使用していないように、パーティションを増やして調整することもできます。エグゼキュータをたくさん抱える唯一の問題は、I/Oが増えますが、ファイルサイズが小さい場合は問題のほうが大きくないはずです。

ところで、wholeTextFiles*のようなワイルドカードをサポートしています。また、S3をファイルシステムとして使用すると、大きなファイルではなく読み込み時間が短くなるため、小さなファイルが多すぎるとボトルネックが発生する可能性があります。だからこそ、これは簡単なことではありません。

希望すると便利です。

関連する問題