2016-04-07 15 views
1

私はかなり簡単なことをしようとしています。私はデータフレームの一部としてdatetimeオブジェクトを持っています。地図を作成するときに、日付を特定の方法でフォーマットしたいと思います。提出されたときにPyspark - ラムダ内で関数を呼び出すとインポートエラーが発生する

unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\ 
     .reduceByKey(lambda x,y: x+y)\ 
     .collectAsMap() 

これは、次の例外が発生します。私は、カスタム関数を作成しました:

def format_date(dt): 
    """Set this for date formatting. dt is datetime.""" 
    return dt.strftime("%Y/%m/%d %H:%M:%S") 

をそして後に、私は私のマップの呼び出しでこれを使用する(XTはdatetimeオブジェクトです)私のスクリプト名は「run_analyses.py」、および「analysis.py」からの輸入すべての機能がある

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, preteckt1.softlayer.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main 
    command = pickleSer._read_with_length(infile) 
    File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
ImportError: No module named analysis 

注:仕事として。私は奇妙なことは、私はインタラクティブpysparkセッション(または私はたformat_date呼び出しを削除する場合)にコードをコピーする場合、それは完全に正常に動作していることである

/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --total-executor-cores 12 run_analyses.py 

でジョブをサブミットします。私はこれを回避するには、新しい列を作成し、format_date関数でUDFを使用して新しい列を作成する方法がありますが、このアプローチが失敗する理由を知りたいと思います。

私は以下のより完全なコードを貼り付けました。

:私がanalysis.pyから直接コードを実行すると成功すると思われますが、run_analysis.pyから実行すると失敗します。私はこれをより正確に示すために以下のコードを変更しました。

run_analyses.py

import datetime, json, math, subprocess 
from os.path import expanduser 
from pyspark import SparkContext 
from pyspark.sql import SQLContext, HiveContext 
from analysis import * 

sc = SparkContext() 
sqlCtx = HiveContext(sc) 
ids = {} 
... 
my_func(sqlCtx,ids) 

analysis.py

def my_func(sqlCtx,ids): 
    df = sqlCtx.read.format("org.apache.spark.sql.cassandra").load(table="table_name", keyspace="keyspace_name").select("id","t","val") 
    df = df.filter((df.t > last_week)&(df.t < now)) 
    df = df.filter(df.val > 0) 
    write_vals(df) 
    ... 

def write_vals(df): 
    unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\ 
      .reduceByKey(lambda x,y: x+y)\ 
      .collectAsMap() 
    ... 
    return 

答えて

2

キーはトレースバックである:

ImportError: No module named analysis 

PySparkは、ワーカープロセスが持っていないことを語っていますanalyze.pyへのアクセス。

sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) 

詳細情報:: https://spark.apache.org/docs/0.9.0/python-programming-guide.html#standalone-use

+0

それをやったあなたがSparkContextを初期化するときは、作業者にコピーする必要があるファイルのリストを渡すことができます!ありがとう:) sc = SparkContext(pyFiles = ['analysis.py'])に私のscインスタンス化を変更しました。 –

関連する問題