2017-11-03 10 views
1

SparseVectorがあり、その値を合計したいとします。Pyspark:SparseVectorの合計誤差

v = SparseVector(15557, [3, 40, 45, 103, 14356], np.ones(5)) 
v.values.sum() 

5.0 

これはうまくいきます。今私はでDataFrameSparseVectorの列があるので、同じことをしたいと思います。私は本当に理解していない

from pyspark.sql import functions as f 

def sum_vector(vector): 
    return vector.values.sum() 

sum_vector_udf = f.udf(lambda x: sum_vector(x)) 

sum_vector_udf(v) 

---- 

AttributeError       Traceback (most recent call last) 
<ipython-input-38-b4d44c2ef561> in <module>() 
     1 v = SparseVector(15557, [3, 40, 45, 103, 14356], np.ones(5)) 
     2 
----> 3 sum_vector_udf(v) 
     4 #v.values.sum() 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/functions.py in wrapper(*args) 
    1955   @functools.wraps(f) 
    1956   def wrapper(*args): 
-> 1957    return udf_obj(*args) 
    1958 
    1959   wrapper.func = udf_obj.func 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/functions.py in __call__(self, *cols) 
    1916   judf = self._judf 
    1917   sc = SparkContext._active_spark_context 
-> 1918   return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) 
    1919 
    1920 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/column.py in _to_seq(sc, cols, converter) 
    58  """ 
    59  if converter: 
---> 60   cols = [converter(c) for c in cols] 
    61  return sc._jvm.PythonUtils.toSeq(cols) 
    62 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/column.py in <listcomp>(.0) 
    58  """ 
    59  if converter: 
---> 60   cols = [converter(c) for c in cols] 
    61  return sc._jvm.PythonUtils.toSeq(cols) 
    62 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/column.py in _to_java_column(col) 
    46   jcol = col._jc 
    47  else: 
---> 48   jcol = _create_column_from_name(col) 
    49  return jcol 
    50 

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/column.py in _create_column_from_name(name) 
    39 def _create_column_from_name(name): 
    40  sc = SparkContext._active_spark_context 
---> 41  return sc._jvm.functions.col(name) 
    42 
    43 

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args) 
    1122 
    1123  def __call__(self, *args): 
-> 1124   args_command, temp_args = self._build_args(*args) 
    1125 
    1126   command = proto.CALL_COMMAND_NAME +\ 

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in _build_args(self, *args) 
    1092 
    1093   args_command = "".join(
-> 1094    [get_command_part(arg, self.pool) for arg in new_args]) 
    1095 
    1096   return args_command, temp_args 

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in <listcomp>(.0) 
    1092 
    1093   args_command = "".join(
-> 1094    [get_command_part(arg, self.pool) for arg in new_args]) 
    1095 
    1096   return args_command, temp_args 

~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_command_part(parameter, python_proxy_pool) 
    287    command_part += ";" + interface 
    288  else: 
--> 289   command_part = REFERENCE_TYPE + parameter._get_object_id() 
    290 
    291  command_part += "\n" 

AttributeError: 'SparseVector' object has no attribute '_get_object_id' 

、私は2つの異なる方法でまったく同じことを書いている:ここで私は、私は理解していないエラーが発生します。任意のヒント?

答えて

0

udfのdataframeの列をselectまたはwithColumnに渡します。

import pyspark.sql.functions as F 
df = df.withColumn('new_column_name', sum_vector_udf(F.col('column_name_of_sparse_vectors')) 

または、

df = df.select([sum_vector_udf(F.col('column_name_of_sparse_vectors').alias('new_column_name')]) 

第一の方法は、新しい列を追加し、第二の方法は、新しいデータフレーム内の同じ列に置き換えられます。

+0

助けてくれてありがとう。私はあなたの両方のソリューションを試しましたが、私はいつも私のdfと同じエラーを受け取ります: "ClassDict(numpy.dtype用)の構築に期待されるゼロ引数"。代わりにudfをdfの外側で実行すると、上に投稿したものが得られます。 –

+0

udfはnumpy dtypeを受け付けません。スパースベクトルを(SparseVector(15557、[3,40,45,103,14356]、np.ones(5))、)と書く。 –

+0

私はこの構文が何を意味するのか正確にはわかりませんが、私は試してみましたが、上記の同じエラーが発生しています。 numpyのdtypeについて、私は@ user6910411の回答に取り組んでいます;) –

0

これは、udfが戻り値の型としてNumPy型をサポートしていないために発生します。

>>> type(v.values.sum()) 
<class 'numpy.float64'> 

あなたは、標準のPythonの型に結果をキャストする必要があります。

df.select(sum_vector("v")).show() 
+-------------+ 
|sum_vector(v)| 
+-------------+ 
|   5.0| 
+-------------+ 
:あなたが期待する結果を得ることができますどちらの場合も

df = spark.createDataFrame([(v,)], ["v"]) 

@udf("double") 
def sum_vector(vector): 
    return vector.values.sum().tolist() 

または

@udf("double") 
def sum_vector(vector): 
    return float(vector.values.sum())