私は大量のファイル(〜500k hdf5)をs3バケット内に持っています。これを処理して別のs3バケットに再アップロードする必要があります。スパークで多数のs3ファイルをダウンロード、処理、アップロード
私はこのようなタスクにはかなり新しいので、私のアプローチが正しいかどうかはまだ分かりません。私は、次の手順を実行します 私はバケット内のキーのリストを取得し、火花とそれを並列化するのbotoを使用します。
download_process_upload
キーで指定されたファイルをダウンロードする機能です
s3keys = bucket.list()
data = sc.parallelize(s3keys)
data = data.map(lambda x: download_process_upload(x))
result = data.collect()
、それにいくつかの処理を行います(エラーが発生した場合、すべてが成功した場合は1を返し、0)と、私はmap
文はステートレスである必要があり、その火花を読んだことがあるので、最後に、私は
success_rate = sum(result)/float(len(s3keys))
を行うことができ 別のバケツにそれを、アップロードし直します私のカスタムマップ機能は間違いなくステートレスではありません。ファイルをディスクにダウンロードしてメモリなどにロードします。
これは適切な方法ですか?