にカスタム関数を適用し、次のスニペットはPySpark RDDオブジェクトに単純な関数を適用しようとすると、外部モジュールを使用する:PySpark
import pyspark
conf = pyspark.SparkConf()
conf.set('spark.dynamicAllocation.minExecutors', 5)
sc = SparkContext(appName="tmp", conf=conf)
sc.setLogLevel('WARN')
fn = 'my_csv_file'
rdd = sc.textFile(fn)
rdd = rdd.map(lambda line: line.split(","))
header = rdd.first()
rdd = rdd.filter(lambda line:line != header)
def parse_line(line):
ret = pyspark.Row(**{h:line[i] for (i, h) in enumerate(header)})
return ret
rows = rdd.map(lambda line: parse_line(line))
sdf = rows.toDF()
私はpython my_snippet.py
でプログラムを起動すると、それはそれを文句で失敗します。
def parse_line(line):
ret = h:line[i] for (i, h) in enumerate(header)
ret['dir'] = dir()
return ret
:
File "<ipython-input-27-8e46d56b2984>", line 6, in <lambda>
File "<ipython-input-27-8e46d56b2984>", line 3, in parse_line
NameError: global name 'pyspark' is not defined
は、私は次のようにparse_line
機能を置き換えます
データフレームが作成され、dir
の列には、 内のネームスペースに、line
とret
という2つのオブジェクトしか含まれていないことが示されています。関数の一部として他のモジュールとオブジェクトを持つにはどうすればいいですか? pysparkだけでなく他のものもあります。
EDITこのpysparkは、プログラム内で使用できます。関数がmap
によって呼び出された場合(そしてfilter
、reduce
などと仮定しています)、インポートされたモジュールは表示されません。
あなたの質問には次の回答がありますか? http://stackoverflow.com/questions/23256536/importing-pyspark-in-python-shell – Yaron