2016-03-25 20 views
1

私はpythonの依存関係をパッケージ化して、​​というハープ・クラスタに送信しようとしています。これはDRYestの方法で可能です。外部の__main__ファイルからのSparkContextをspark-submitに変更

私は私のmy_spark_app.pyは次のようになりたい:

from pyspark import SparkContext, SparkConf 

conf = SparkConf().setAppName('MyApp').setMaster('yarn-client') 
sc = SparkContext(conf=conf) 

sc.addPyFile('/path/to/dependencies.py') 
from dependencies import DependencyManager 
dm = DependencyManager(sc) 

dm.register_lib('dateutil') 
import dateutil 

# do stuff with dateutil 

そしてdependencies.pyはこれです:

import zipfile, os 

LIBPATH = '/path/to/my/python/env/lib/python2.7/site-packages/' 

class DependencyManager(object): 
    """ 
    Collects dependencies to be zipped and sent to the spark context 
    """ 
    def __init__(self, spark_context): 
     self.sc = spark_context 

    def register_lib(self, p): 
     libpath = os.path.join(LIBPATH, p) 
     zippath = libpath + '.zip' 
     zf = zipfile.PyZipFile(zippath, mode='w') 
     try: 
      zf.debug = 3 
      zf.writepy(libpath) 
      self.sc.addPyFile(zippath) 
     finally: 
      zf.close() 

これは、(理由はzf.debug = 3の)これを生成します:どういうわけか

Adding package in /path/to/env/lib/python2.7/site-packages/dateutil as dateutil 
Adding dateutil/__init__.pyc 
Adding dateutil/rrule.pyc 
Adding dateutil/relativedelta.pyc 
Adding package in /path/to/env/lib/python2.7/site-packages/dateutil/zoneinfo as dateutil/zoneinfo 
Adding dateutil/zoneinfo/__init__.pyc 
Adding dateutil/zoneinfo/rebuild.pyc 
Adding dateutil/parser.pyc 
Adding dateutil/tzwin.pyc 
Adding dateutil/easter.pyc 
Adding package in /path/to/env/lib/python2.7/site-packages/dateutil/tz as dateutil/tz 
Adding dateutil/tz/__init__.pyc 
Adding dateutil/tz/tz.pyc 
Adding dateutil/tz/win.pyc 
Adding dateutil/tz/_common.pyc 
Traceback (most recent call last): 
    File "/path/to/my_spark_app.py", line 25 
    import dateutil 
ImportError: No module named dateutil 

、wからself.sc.addPyFile()を呼び出すDependencyManagerクラスは、my_spark_app.pyで直接問題なく動作しますが、SparkContextには影響しません。

ここでは何が起こっていますか?

答えて

1

問題は簡単で、スパークとはほとんど関係がありません。ここで :self.sc.addPyFile(zippath)zf ioはまだ開いていると呼ばれている

def register_lib(self, p): 
    libpath = os.path.join(LIBPATH, p) 
    zippath = libpath + '.zip' 
    zf = zipfile.PyZipFile(zippath, mode='w') 
    try: 
     zf.debug = 3 
     zf.writepy(libpath) 
     self.sc.addPyFile(zippath) 
    finally: 
     zf.close() 

。電話する前に電話を切るだけでいいです:

def register_lib(self, p): 
    libpath = os.path.join(LIBPATH, p) 
    zippath = libpath + '.zip' 
    zf = zipfile.PyZipFile(zippath, mode='w') 
    try: 
     zf.debug = 3 
     zf.writepy(libpath) 
     zf.close() # file is now ready to add to the spark context 
     self.sc.addPyFile(zippath) 
    finally: 
     zf.close() 
+0

ありがとう、魅力的です –

関連する問題