2017-10-03 6 views
0

私は2つのファイルを持っています。 functions.pyには機能があり、その機能からpyspark udfが作成されます。 main.py udfをインポートしようとしました。ただし、main.pyfunctions.pyの機能にアクセスするのに問題があるようです。メインクラスにpyspark UDFをインポートする方法

functions.py:

from pyspark.sql.functions import udf 
from pyspark.sql.types import StringType 

def do_something(x): 
    return x + 'hello' 

sample_udf = udf(lambda x: do_something(x), StringType()) 

main.py:

これはエラーになり
from functions import sample_udf, do_something 
df = spark.read.load(file) 
df.withColumn("sample",sample_udf(col("text"))) 

:私はdo_something機能をバイパスし、すぐ内側にそれを置く場合

17/10/03 19:35:29 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 6, ip-10-223-181-5.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/pyspark/worker.py", line 164, in main 
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 93, in read_udfs 
    arg_offsets, udf = read_single_udf(pickleSer, infile) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 79, in read_single_udf 
    f, return_type = read_command(pickleSer, infile) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 55, in read_command 
    command = serializer._read_with_length(file) 
    File "/usr/lib/spark/python/pyspark/serializers.py", line 169, in _read_with_length 
    return self.loads(obj) 
    File "/usr/lib/spark/python/pyspark/serializers.py", line 454, in loads 
    return pickle.loads(obj) 
AttributeError: 'module' object has no attribute 'do_something' 

udf、例えば:udf(lambda x: x + ' hello', StringType())、UDFのインポートは問題ありませんが、私の機能はもう少し長くなります別の機能でカプセル化されているといいですね。これを達成する正しい方法は何ですか?

+0

したがって、関数をインポートしてメイン関数にカプセル化することができます。つまり、 'udf'に関数をラップします。しかし、udfだけを使用しているので、あなたは 'do_something'をインポートせずにそれを実行しようとしましたか?インポートは同じになりすべてがUDF をインポートし、メインで 'do_something'を作成します*メイン でUDFを作成し、ちょうどUDF *インポート' do_something' * :私はのバリエーションを試してみた – Chinny84

+0

残念ながら – ayplam

+1

@ayplamあなたのエグゼキュータで利用できるように、sparkcontextにpyファイルを追加してください。 はここに私のテストのノート あるsample_udf機能から sc.addPyFile( "functions.py") インポートhttps://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3669221609244155/3140647912908320/868274901052987/ latest.html –

答えて

2

だけの答えとしてこれを追加する: -

はあなたの執行が利用できるようにするためにsparkcontextためにあなたのPYファイルを追加します。

sc.addPyFile("functions.py") 
from functions import sample_udf 

は、ここに私のテストノート

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3669221609244155/3140647912908320/868274901052987/latest.html

おかげで、 チャールズです。

関連する問題