2016-10-03 3 views
1

マップ関数を使ってsparkでデータセットを実行するという複雑な関数があります。それは別のpythonモジュールにあります。 mapが呼び出されると、executorノードはそのコードを持たず、マップ関数は失敗します。Sparkに別のモジュールのコードを見せるにはどうすればいいですか?

s_cobDates = getCobDates() #returns a list of dates 
sb_dataset = sc.broadcast(dataset) #fyi - it is not trivial to slice this into chunks per date 

def sparkInnerLoop(n_cobDate): 
    n_dataset = sb_dataset.value 
    import someOtherModule 
    return someOtherModule.myComplicatedCalc(n_dataset) 

results = s_cobDates.map(sparkInnerLoop).collect() 

スパークは、myOtherModuleをインポートできないため、失敗します。

これまでのところ、someOtherModuleを含むpythonパッケージを作成し、sparkジョブより先にクラスターにデプロイしていますが、これはラピッドプロトタイピングには向いていません。

すべてのコードを "sparkInnerLoop"にインライン展開せずに、完全なコードを実行者ノードに送信するにはどうすればよいですか?そのコードは私のソリューションのどこかで使われているので、コードの重複は望んでいません。

私は8ノードクラスタをスタンドアローンモードで使用しています(v 1.6.2)。ドライバはpycharmの私のワークステーションで動作しています。

答えて

0

上記の答えはうまくいきます。あなたのモジュールがパッケージの一部である場合はうまくいきます。その代わりに、あなたのモジュールを圧縮し、あなたのsparkコンテキストにzipファイルを追加することが可能です。そして、彼らは正しいパッケージ名を持っています。

def ziplib(): 
    libpath = os.path.dirname(__file__) # this should point to your packages directory 
    zippath = r'c:\Temp\mylib-' + randstr.randstr(6) + '.zip' 
    zippath = os.path.abspath(zippath) 
    zf = zipfile.PyZipFile(zippath, mode='w') 
    try: 
     zf.debug = 3 # making it verbose, good for debugging 
     zf.writepy(libpath) 
     return zippath # return path to generated zip archive 
    finally: 
     zf.close() 

sc = SparkContext(conf=conf) 

zip_path = ziplib() # generate zip archive containing your lib 
zip_path = pathlib.Path(zip_path).as_uri() 
sc.addPyFile(zip_path) # add the entire archive to SparkContext 
+0

これは非常にうまくいきました – ThatDataGuy

関連する問題