2016-06-17 3 views
0

私は大量のファイル(〜500k hdf5)をs3バケット内に持っています。これを処理して別のs3バケットに再アップロードする必要があります。スパークで多数のs3ファイルをダウンロード、処理、アップロード

私はこのようなタスクにはかなり新しいので、私のアプローチが正しいかどうかはまだ分かりません。私は、次の手順を実行します 私はバケット内のキーのリストを取得し、火花とそれを並列化するのbotoを使用します。

download_process_uploadキーで指定されたファイルをダウンロードする機能です
s3keys = bucket.list() 
data = sc.parallelize(s3keys) 
data = data.map(lambda x: download_process_upload(x)) 
result = data.collect() 

、それにいくつかの処理を行います(エラーが発生した場合、すべてが成功した場合は1を返し、0)と、私はmap文はステートレスである必要があり、その火花を読んだことがあるので、最後に、私は

success_rate = sum(result)/float(len(s3keys)) 

を行うことができ 別のバケツにそれを、アップロードし直します私のカスタムマップ機能は間違いなくステートレスではありません。ファイルをディスクにダウンロードしてメモリなどにロードします。

これは適切な方法ですか?

答えて

0

S3からデータをダウンロードして処理する方法論をうまく使っています。私はマップステートメント内からデータをアップロードしようとはしませんでした。しかし、私はあなたがs3からファイルを読み込み、処理してから新しい場所にアップロードすることができない理由は見当たりません。

また、いくつかのキーストロークを保存して明示的なラムダを次のようにマップステートメントから取り除くことができます。data = data.map(download_process_upload)

関連する問題