2

Google Storage(my project bucket)からダウンロードするデータを処理するPythonオブジェクトの実行を並列化するアプリケーションがあります。クラスタはGoogle Dataprocを使用して作成されます。問題は、データが決してダウンロードされないということです!問題を理解して理解するためのテストプログラムを書いた。 私はバケツからファイルをコピーするために、労働者上のファイルを作成する作業を行うかどうかを確認するために、次の関数を書いた:Spark(Python)とDataprocを使用してGoogleストレージからファイルをダウンロード

from subprocess import call 
from os.path import join 

def copyDataFromBucket(filename,remoteFolder,localFolder): 
    call(["gsutil","-m","cp",join(remoteFolder,filename),localFolder] 

def execTouch(filename,localFolder): 
    call(["touch",join(localFolder,"touched_"+filename)]) 

私はPythonシェルからそれを呼び出すことで、この機能をテストしてみたし、それが動作します。しかし、私は火花提出を使って、次のコードを実行すると、ファイルがダウンロードされません(ただし、エラーは発生しません):

# ... 
filesRDD = sc.parallelize(fileList) 
filesRDD.foreach(lambda myFile: copyDataFromBucket(myFile,remoteBucketFolder,'/tmp/output') 
filesRDD.foreach(lambda myFile: execTouch(myFile,'/tmp/output') 
# ... 

execTouch関数作品(私は各ワーカー上のファイルを見ることができる)が、copyDataFromBucket機能何もしません。

どうしたのですか?

+0

精度:アプリケーションを実行するにはAnaconda2パッケージを使用しますが、gsutilが動作するようにCLOUDSDK_PYTHON変数を/ usr/bin/pythonに設定する必要があります。 – ma3oun

+0

ifあなたは 'gsutil -m cp ...'をbashやシェルで実行していましたが、それは現在動作していますか? – Kristian

+0

はい、マスターとワーカーの両方でうまくいきます。 – ma3oun

答えて

1

問題は明らかにSparkのコンテキストでした。 「のHadoop FS」を呼び出して「gsutilの」への呼び出しを交換すると、それを解決します

from subprocess import call 
from os.path import join 

def copyDataFromBucket(filename,remoteFolder,localFolder): 
    call(["hadoop","fs","-copyToLocal",join(remoteFolder,filename),localFolder] 

私もバケットにデータを送信するためのテストをしました。 「-copyToLocal」を「-copyFromLocal」に置き換える必要があります

関連する問題