2016-03-20 13 views
1

にカスタム関数を適用し、次のスニペットはPySpark RDDオブジェクトに単純な関数を適用しようとすると、外部モジュールを使用する:PySpark

import pyspark 
conf = pyspark.SparkConf() 
conf.set('spark.dynamicAllocation.minExecutors', 5) 
sc = SparkContext(appName="tmp", conf=conf) 
sc.setLogLevel('WARN') 

fn = 'my_csv_file' 
rdd = sc.textFile(fn) 
rdd = rdd.map(lambda line: line.split(",")) 
header = rdd.first() 
rdd = rdd.filter(lambda line:line != header) 
def parse_line(line): 
    ret = pyspark.Row(**{h:line[i] for (i, h) in enumerate(header)}) 
    return ret 
rows = rdd.map(lambda line: parse_line(line)) 
sdf = rows.toDF() 

私はpython my_snippet.pyでプログラムを起動すると、それはそれを文句で失敗します。

def parse_line(line): 
    ret = h:line[i] for (i, h) in enumerate(header) 
    ret['dir'] = dir() 
    return ret 
File "<ipython-input-27-8e46d56b2984>", line 6, in <lambda> 
File "<ipython-input-27-8e46d56b2984>", line 3, in parse_line 
NameError: global name 'pyspark' is not defined 

は、私は次のようにparse_line機能を置き換えます

データフレームが作成され、dirの列には、 内のネームスペースに、lineretという2つのオブジェクトしか含まれていないことが示されています。関数の一部として他のモジュールとオブジェクトを持つにはどうすればいいですか? pysparkだけでなく他のものもあります。

EDITこのpysparkは、プログラム内で使用できます。関数がmapによって呼び出された場合(そしてfilterreduceなどと仮定しています)、インポートされたモジュールは表示されません。

+0

あなたの質問には次の回答がありますか? http://stackoverflow.com/questions/23256536/importing-pyspark-in-python-shell – Yaron

答えて

0

1)元の質問への回答: 問題の原因は、あなたは使用してコードを実行する必要がありmy_snippet.py のpython火花を提出実行されているように思えmy_snippet.py

2)次の行が存在しない、私のipythonのノート型パーソナル設定で :

import pyspark 
conf = pyspark.SparkConf() 
conf.set('spark.dynamicAllocation.minExecutors', 5) 
sc = SparkContext(appName="tmp", conf=conf) 

「SC」の範囲外で定義されているノートブックの質問に答えるためにipython私のプログラム

3)numpyのを使用するためにnumpyの(またはインストールする必要が他のモジュール) に関する質問への答えは、あなたが)のapt-getまたはピップまたはソースからのインストールを使用して(numpyのをインストールする必要が上のすべてのクラスタ内のノード

+0

あなたは正しいです、 'spark-submit'を実行すると、スタンドアロンプ​​ログラムの場合に問題は解決します。一方、IPythonノートブックを実行したい場合、これは動作しません。 IPythonを 'IPYTHON_OPTS ="ノートブック "pyspark"で起動することはできますが、実行時に 'SparkContext'オプションを変更することはできません。さらに、 'pyspark'の代わりに' numpy'や他のモジュールを使うと、 'spark-submit'はどちらの助けにもなりません –