2016-10-15 2 views
0

これは、タイプ。UnsupportedOperationException:式を評価できません:..Column()とudf()で新しい列を追加するとき

だから私はタイプのデータフレームdf_srcを持っている:

<class 'pyspark.sql.dataframe.DataFrame'> 

、ここではそのスキーマです:

root 
|-- src_ip: string (nullable = true) 
|-- year: integer (nullable = true) 
|-- month: integer (nullable = true) 
|-- day: integer (nullable = true) 
|-- hour: integer (nullable = true) 
|-- minute: integer (nullable = true) 

私も以前の機能を宣言:

def parse_df_to_string(year, month, day, hour=0, minute=0): 
second = 0 
return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(year, month, day, hour, minute, second) 

もIテストを行い、それは魅力のように動作します:

from pyspark.sql.functions import udf 
u_parse_df_to_string = udf(parse_df_to_string) 

最後に、この要求:

df_src.select('*', 
       u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute']) 
      ).show() 

を引き起こす:

print parse_df_to_string(2016, 10, 15, 21) 
print type(parse_df_to_string(2016, 10, 15, 21)) 

2016-10-15 21:00:00 
<type 'str'> 
ので、私はまた、UDFとスパークAPIのように似たようなやった

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-126-770b587e10e6> in <module>() 
    25 # Could not make this part wor.. 
    26 df_src.select('*', 
---> 27   u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute']) 
    28    ).show() 

/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate) 
    285   +---+-----+ 
    286   """ 
--> 287   print(self._jdf.showString(n, truncate)) 
    288 
    289  def __repr__(self): 

/opt/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    931   answer = self.gateway_client.send_command(command) 
    932   return_value = get_return_value(
--> 933    answer, self.gateway_client, self.target_id, self.name) 
    934 
    935   for temp_arg in temp_args: 

/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    61  def deco(*a, **kw): 
    62   try: 
---> 63    return f(*a, **kw) 
    64   except py4j.protocol.Py4JJavaError as e: 
    65    s = e.java_exception.toString() 
    ... 


    Py4JJavaError: An error occurred while calling o5074.showString. 
: java.lang.UnsupportedOperationException: Cannot evaluate expression: parse_df_to_string(input[1, int, true], input[2, int, true], input[3, int, true], input[4, int, true], input[5, int, true]) 
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224) 
    at org.apache.spark.sql.execution.python.PythonUDF.doGenCode(PythonUDF.scala:27) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104) 
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:101) 
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740) 
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740) 

を...

私は多くのことを試しましたが、私はただ1つのパラメータ&引数でメソッドを呼び出そうとしましたが、助けにはなりませんでした。それはしかし仕事をした

一つの方法は、以下のように新しい列と新しいデータフレームを作成することである:

df_src_grp_hr_to_timestamp = df_src_grp_hr_d.select(
df_src_grp_hr_d['src_ip'], 
df_src_grp_hr_d['year'], 
df_src_grp_hr_d['month'], 
df_src_grp_hr_d['day'], 
df_src_grp_hr_d['hour'], 
df_src_grp_hr_d['time'].cast('timestamp')) 
+1

本当に誤植ではありませんか?あなたのエラーメッセージを見てください:式を評価できません:** parse_df_to_stringg **、余分な "g"を参照してください? – paisanco

+0

良好な観察のpaisanco。それは私がその間に試したもう一つのテストで、ただ一つのパラメータしか持たない関数でした。私は間違ってあなたを誘拐して申し訳ありませんが、例外はまだそこにあります。 – aks

+0

これは正しい「分」または「時間」ですか? –

答えて

0

オールライト:その後、私は、タイムスタンプに列をキャストすることができ

df_src_grp_hr_d = df_src.select('*', concat(
    col("year"), 
    lit("-"), 
    col("month"), 
    lit("-"), 
    col("day"), 
    lit(" "), 
    col("hour"), 
    lit(":0")).alias('time'))` 

。私は問題を理解していると思う...私のdataFrameはメモリにロードされた多くのデータを持っていたので、原因は、show()アクションが失敗するためです。

私はそれを実現する方法

は何が例外を引き起こしているということです。
Py4JJavaError: An error occurred while calling o2108.showString. 
: java.lang.UnsupportedOperationException: Cannot evaluate expression: 

は本当に df.show()アクションです。

私はからコードスニペットを実行していることを確認することができます: 働いConvert pyspark string to date format

from datetime import datetime 
from pyspark.sql.functions import col,udf, unix_timestamp 
from pyspark.sql.types import DateType 



# Creation of a dummy dataframe: 
df1 = sqlContext.createDataFrame([("11/25/1991","11/24/1991","11/30/1991"), 
          ("11/25/1391","11/24/1992","11/30/1992")], schema=['first', 'second', 'third']) 

# Setting an user define function: 
# This function converts the string cell into a date: 
func = udf (lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType()) 

df = df1.withColumn('test', func(col('first'))) 

df.show() 

df.printSchema() 

!しかし、それはまだ私のdataFrame df_srcで動作しませんでした。

原因は私のデータベースサーバーからメモリにたくさんのデータをロードしているからです(8-900万行を超えるようなもの)。.show()の場合、spudはudf内で実行を実行できません。 dataFrameにロードされた結果のデフォルトで20エントリを表示します)。

show(n = 1)が呼び出されても、同じ例外がスローされます。

printSchema()が呼び出された場合、新しい列が効果的に追加されます。

新しい列が追加されているかどうかを確認する方法の1つは、単にアクションprint dataFrame.take(10)を呼び出すことです。

最後に、それを動作させるための一つの方法は、新しいデータフレームに影響を与えると(選択でUDFを呼び出すときに.show()を呼び出さないことです)のように:

df_to_string = df_src.select('*', 
      u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute']) 
     ) 

そして、それをキャッシュ:

df_to_string.cache 

今すぐ.show()は問題なしで呼び出すことができます:

df_to_string.show() 
関連する問題