私はかなり簡単なことをしようとしています。私はデータフレームの一部としてdatetimeオブジェクトを持っています。地図を作成するときに、日付を特定の方法でフォーマットしたいと思います。提出されたときにPyspark - ラムダ内で関数を呼び出すとインポートエラーが発生する
がunique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
.reduceByKey(lambda x,y: x+y)\
.collectAsMap()
これは、次の例外が発生します。私は、カスタム関数を作成しました:
def format_date(dt):
"""Set this for date formatting. dt is datetime."""
return dt.strftime("%Y/%m/%d %H:%M:%S")
をそして後に、私は私のマップの呼び出しでこれを使用する(XTはdatetimeオブジェクトです)私のスクリプト名は「run_analyses.py」、および「analysis.py」からの輸入すべての機能がある
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, preteckt1.softlayer.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
ImportError: No module named analysis
注:仕事として。私は奇妙なことは、私はインタラクティブpysparkセッション(または私はたformat_date呼び出しを削除する場合)にコードをコピーする場合、それは完全に正常に動作していることである
/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --total-executor-cores 12 run_analyses.py
でジョブをサブミットします。私はこれを回避するには、新しい列を作成し、format_date関数でUDFを使用して新しい列を作成する方法がありますが、このアプローチが失敗する理由を知りたいと思います。
私は以下のより完全なコードを貼り付けました。
:私がanalysis.pyから直接コードを実行すると成功すると思われますが、run_analysis.pyから実行すると失敗します。私はこれをより正確に示すために以下のコードを変更しました。
run_analyses.py
import datetime, json, math, subprocess
from os.path import expanduser
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
from analysis import *
sc = SparkContext()
sqlCtx = HiveContext(sc)
ids = {}
...
my_func(sqlCtx,ids)
analysis.py
def my_func(sqlCtx,ids):
df = sqlCtx.read.format("org.apache.spark.sql.cassandra").load(table="table_name", keyspace="keyspace_name").select("id","t","val")
df = df.filter((df.t > last_week)&(df.t < now))
df = df.filter(df.val > 0)
write_vals(df)
...
def write_vals(df):
unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
.reduceByKey(lambda x,y: x+y)\
.collectAsMap()
...
return
それをやったあなたがSparkContextを初期化するときは、作業者にコピーする必要があるファイルのリストを渡すことができます!ありがとう:) sc = SparkContext(pyFiles = ['analysis.py'])に私のscインスタンス化を変更しました。 –