Google Storage(my project bucket)からダウンロードするデータを処理するPythonオブジェクトの実行を並列化するアプリケーションがあります。クラスタはGoogle Dataprocを使用して作成されます。問題は、データが決してダウンロードされないということです!問題を理解して理解するためのテストプログラムを書いた。 私はバケツからファイルをコピーするために、労働者上のファイルを作成する作業を行うかどうかを確認するために、次の関数を書いた:Spark(Python)とDataprocを使用してGoogleストレージからファイルをダウンロード
from subprocess import call
from os.path import join
def copyDataFromBucket(filename,remoteFolder,localFolder):
call(["gsutil","-m","cp",join(remoteFolder,filename),localFolder]
def execTouch(filename,localFolder):
call(["touch",join(localFolder,"touched_"+filename)])
私はPythonシェルからそれを呼び出すことで、この機能をテストしてみたし、それが動作します。しかし、私は火花提出を使って、次のコードを実行すると、ファイルがダウンロードされません(ただし、エラーは発生しません):
# ...
filesRDD = sc.parallelize(fileList)
filesRDD.foreach(lambda myFile: copyDataFromBucket(myFile,remoteBucketFolder,'/tmp/output')
filesRDD.foreach(lambda myFile: execTouch(myFile,'/tmp/output')
# ...
execTouch関数作品(私は各ワーカー上のファイルを見ることができる)が、copyDataFromBucket機能何もしません。
どうしたのですか?
精度:アプリケーションを実行するにはAnaconda2パッケージを使用しますが、gsutilが動作するようにCLOUDSDK_PYTHON変数を/ usr/bin/pythonに設定する必要があります。 – ma3oun
ifあなたは 'gsutil -m cp ...'をbashやシェルで実行していましたが、それは現在動作していますか? – Kristian
はい、マスターとワーカーの両方でうまくいきます。 – ma3oun