2016-06-15 8 views
3

を生成するために、MAX()関数を使用することができません:withColumnは、私はこのようなデータセット持っている私は、新しい列

a = sc.parallelize([[1,2,3],[0,2,1],[9,8,7]]).toDF(["one", "two", "three"]) 

を私は最大値に等しい新しい列を追加し、データセットを持つようにしたいです他の3つの列に表示されます。 出力は次のようになります。

+----+----+-----+-------+ 
|one |two |three|max_col| 
+----+----+-----+-------+ 
| 1| 2| 3|  3| 
| 0| 2| 1|  2| 
| 9| 8| 7|  9| 
+----+----+-----+-------+ 

私はそうのように、withColumnを使用するだろうと思った:

b = a.withColumn("max_col", max(a["one"], a["two"], a["three"])) 

が、これは誤り

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/opt/spark152/python/pyspark/sql/column.py", line 418, in __nonzero__ 
    raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', " 
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions. 

奇数が得られます。 maxboolを返しますか? the documentation on maxによるものではありません。はい。奇妙な。

私はそれは奇妙な、これが機能することを見つける:

b = a.withColumn("max_col", a["one"] + a["two"] + a["three"])) 

そして、それが動作するという事実は私がmaxは、私は理解していないいくつかの方法を動作していることがさらに強く考えさせます。

また、私はb = a.withColumn("max_col", max([a["one"], a["two"], a["three"]]))を試しましたが、これは3つの列で3つの列ではなく3つの列になります。これにより、上記と同じエラーが発生します。

答えて

4

実際にあなたがここに必要なものgreatestmaxではありません。

from pyspark.sql.functions import greatest 

a.withColumn("max_col", greatest(a["one"], a["two"], a["three"])) 

そして、ちょうど完全性のためにあなたは、最小値を求めるためにleastを使用することができます。

from pyspark.sql.functions import least 

a.withColumn("min_col", least(a["one"], a["two"], a["three"])) 

あなたはそれが非常に簡単であることがわかりエラーについて。 maxは豊富な比較に依存します。

type(col("a") < col("b") 
## pyspark.sql.column.Column 

PySpark明示的にそれは単に意味がないので、(あなたがColumn.__nonzero__ソースを確認することができます)ブールに列を変換禁じ:あなたは2つの列を比較するとき、あなたはColumnが得ます。これは、ドライバーのコンテキストでは評価できない論理式だけです。

1

私が正しく理解すれば、列の最大値と行の最大値が一致しません。実際に.withColumnは列を受け取る必要があります。必要なのは行操作です。

b=a.map(lambda row: (row.one, row.two, row.three, max(row))) 

bは、その後RDDである、あなたはそれが期待pyspark.sql.Columnを返さないとあなたのpythonからmaxを使用することはできません

b.toDF('one','two','three','max') 
0

DATAFRAMEに変換することができます。 pysparkのデータフレーム機能の例は、戻り注意、いくつかの列からリストを作成したarrayです:あなたが必要なものを達成するために

http://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#array

を、あなたは(未テスト)のように、ユーザー定義関数を書くことができ

from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 

def my_max(*cols): 
    return max(cols) 

udf_my_max = udf(my_max, IntegerType) 

df.withColumn('max_col', udf_my_max(a.columns)) 
+0

残念ながら、これは私のためには機能しませんでした。テストするチャンスがなかったので、それは小さな問題/バグです。私はRDDではなくDataFramesを使い続けることを好むので、実用的なソリューションを見つけたらそれを感謝します! –

関連する問題