2017-10-10 12 views
0

私は以下のコードを持っています。基本的に私がしようとしているのは、既存の列の値からいくつかの新しい列を生成することです。私がそれをした後、私はクラスタ内のテーブルとして新しい列を持つデータフレームを保存します。申し訳ありませんが、私はまだpysparkするのは新しいです。Pyspark:ラムダ関数と.withColumnを使用すると、型のエラーが発生します。理解できません。

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 
from pyspark.sql.functions import udf, array 
from pyspark.sql.types import DecimalType 
import numpy as np 
import math 

df = sqlContext.sql('select * from db.mytable') 

angle_av = udf(lambda (x, y): -10 if x == 0 else math.atan2(y/x)*180/np.pi, DecimalType(20,10)) 

df = df.withColumn('a_v_angle', angle_av(array('a_v_real', 'a_v_imag'))) 

df.createOrReplaceTempView('temp') 

sqlContext.sql('create table new_table as select * from temp') 

これらの操作は、実際にすべてのエラーを生成しません。私は、(私はこのことから推測している操作が実際に実行されたときである)テーブルとしてDFを保存し、次のエラーを取得しようとすると:

File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 171, in main 
    process() 
    File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 166, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 103, in <lambda> 
    func = lambda _, it: map(mapper, it) 
    File "<string>", line 1, in <lambda> 
    File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> 
    return lambda *a: f(*a) 
    File "<stdin>", line 14, in <lambda> 
TypeError: unsupported operand type(s) for /: 'NoneType' and 'NoneType' 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
+0

plsはエラー – desertnaut

+0

にエラーが発生し、コマンドを生成する正確なコマンドをポストです:sqlContext.sql( '一時SELECT * FROMとしてNEW_TABLE作成') –

答えて

0

入力値がnull/Noneているので、これが起こります。関数はその入力をチェックし、それに従って処理を進めるべきです。

f x == 0 or x is None 

か、単に

if not x 
+0

ありがとう!これは問題を解決しました!とても有難い! –

関連する問題