0

PySparkでブロードキャストオブジェクトを使用するUDFを呼び出そうとしています。ここで PySparkでブロードキャストオブジェクトを使用してUDFを呼び出すときのエラー

は、状況やエラーを再現し、最小限の例です。このスニペットが生成する

import pyspark.sql.functions as sf 
from pyspark.sql.types import LongType 


class SquareClass: 
    def compute(self, n): 
     return n ** 2 


square = SquareClass() 
square_sc = sc.broadcast(square) 

def f(n): 
    return square_sc.value.compute(n) 

numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF() 
f_udf = sf.udf(f, LongType()) 

numbers.select(f_udf(numbers.id)).show(10) 

スタックトレースとエラーメッセージ:

Traceback (most recent call last) 
<ipython-input-75-6e38c014e4b2> in <module>() 
    13 f_udf = sf.udf(f, LongType()) 
    14 
---> 15 numbers.select(f_udf(numbers.id)).show(10) 

/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py in show(self, n, truncate) 
    255   +---+-----+ 
    256   """ 
--> 257   print(self._jdf.showString(n, truncate)) 
    258 
    259  def __repr__(self): 

/usr/local/lib/python3.5/dist-packages/py4j/java_gateway.py in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, 

<snip> 

An error occurred while calling o938.showString. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 49.0 failed 1 times, most recent failure: Lost task 1.0 in stage 49.0 (TID 587, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
+0

あなたはトレースを提供していませんが、それはAttributeError – MaFF

答えて

2

square_scの属性を呼び出すときにあなたがしています作業員に存在しないモジュールSquareClassを呼び出します。

あなたはUDFでPythonのパッケージ、クラス、関数を使用したい場合は、労働者がそれにアクセスすることができるようにする必要があり、あなたはPythonスクリプト内のコードを入れて、あなたを実行するとき--py-filesを使用して、それを展開することによって、これを達成することができます​​、pyspark

1

あなたができることは、クラスを別のモジュールとして保持し、モジュールをsparkContextに追加することです。

class_module.py 

class SquareClass: 
    def compute(self, n): 
     return n ** 2 

pyspark-shell 

    import pyspark.sql.functions as sf 
    from pyspark.sql.types import LongType 
    from class_module import SquareClass 

    sc.addFile('class_module.py') 

    square = SquareClass() 
    square_sc = sc.broadcast(square) 
    def f(n): 
     return square_sc.value.compute(n) 

    f_udf = sf.udf(f, LongType()) 
    numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF() 
    numbers.select(f_udf(numbers.id)).show(10) 
    +-----+ 
    |f(id)| 
    +-----+ 
    | 0| 
    | 1| 
    | 4| 
    | 9| 
    | 16| 
    | 25| 
    | 36| 
    | 49| 
    | 64| 
    | 81| 
    +-----+ 
関連する問題