2016-10-28 22 views
1

何の属性を持っていない私はPySparkエラー:はAttributeErrorは:「NoneType」オブジェクトは、「_jvm」

の形式であるタイムスタンプデータセットを持っていると私は、このデータセットを処理して、キー値のマップとして返​​すようにpysparkでUDFを書かれています。しかし、エラーメッセージの下になっています。

データセット:df_ts_list

+--------------------+ 
|    ts_list| 
+--------------------+ 
|[1477411200, 1477...| 
|[1477238400, 1477...| 
|[1477022400, 1477...| 
|[1477224000, 1477...| 
|[1477256400, 1477...| 
|[1477346400, 1476...| 
|[1476986400, 1477...| 
|[1477321200, 1477...| 
|[1477306800, 1477...| 
|[1477062000, 1477...| 
|[1477249200, 1477...| 
|[1477040400, 1477...| 
|[1477090800, 1477...| 
+--------------------+ 

Pyspark UDF:

>>> def on_time(ts_list): 
...  import sys 
...  import os 
...  sys.path.append('/usr/lib/python2.7/dist-packages') 
...  os.system("sudo apt-get install python-numpy -y") 
...  import numpy as np 
...  import datetime 
...  import time 
...  from datetime import timedelta 
...  ts = np.array(ts_list) 
...  if ts.size == 0: 
...    count = 0 
...    duration = 0 
...    st = time.mktime(datetime.now()) 
...    ymd = str(datetime.fromtimestamp(st).date()) 
...  else: 
...    ts.sort() 
...    one_tag = [] 
...    start = float(ts[0]) 
...    for i in range(len(ts)): 
...      if i == (len(ts)) - 1: 
...        end = float(ts[i]) 
...        a_round = [start, end] 
...        one_tag.append(a_round) 
...      else: 
...        diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i]))) 
...        if abs(diff.total_seconds()) > 3600: 
...          end = float(ts[i]) 
...          a_round = [start, end] 
...          one_tag.append(a_round) 
...          start = float(ts[i+1]) 
...    one_tag = [u for u in one_tag if u[1] - u[0] > 300] 
...    count = int(len(one_tag)) 
...    duration = int(np.diff(one_tag).sum()) 
...    ymd = str(datetime.datetime.fromtimestamp(time.time()).date()) 
...  return {'count':count,'duration':duration, 'ymd':ymd} 

Pysparkコード:

>>> on_time=udf(on_time, MapType(StringType(),StringType())) 
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show() 

エラー:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main 
    process() 
    File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda> 
    func = lambda _, it: map(mapper, it) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda> 
    mapper = lambda a: udf(*a) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda> 
    return lambda *a: f(*a) 
    File "<stdin>", line 27, in on_time 
    File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _ 
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) 
AttributeError: 'NoneType' object has no attribute '_jvm' 

何か助けていただければ幸いです!

答えて

5

udfの27行目で、いくつかのpyspark SQL関数が呼び出されているというエラーメッセージが表示されます。それはabs()と線ですので、あなたの上のどこかでfrom pyspark.sql.functions import *と呼んで、それはPythonのabs()の機能を無効にします。

0

マリウスの答えは本当に私を助けませんでした。だから、私が好きなのは、これがGoogleの唯一の結果であり、あなたがpyspark(そして一般的にはspark)を初めて使ったからです。

私の場合は、pyspark環境が設定される前にpysparkコードを実行しようとしていたため、私のエラーが発生しました。

pyspark.sql.functionsに依存する通話を行う前に、pysparkが利用可能でセットアップされていることを確認して、問題を解決しました。

関連する問題