2017-02-27 16 views
3

に対応するカラム名を検索:スカラ/スパークデータフレーム:データフレームを持つ、スカラ座/スパークで最大

val dfIn = sqlContext.createDataFrame(Seq(
    ("r0", 0, 2, 3), 
    ("r1", 1, 0, 0), 
    ("r2", 0, 2, 2))).toDF("id", "c0", "c1", "c2") 

私が対応する列の名前を保持する新しい列maxColを計算したいと思います(行ごとに)最大値に設定します。この例では、出力は次のようになります。

+---+---+---+---+------+ 
| id| c0| c1| c2|maxCol| 
+---+---+---+---+------+ 
| r0| 0| 2| 3| c2| 
| r1| 1| 0| 0| c0| 
| r2| 0| 2| 2| c1| 
+---+---+---+---+------+ 

実際にデータフレームには60を超える列があります。したがって、一般的な解決策が必要です。 Pythonのパンダ(はい、私は知っている、私はpysparkと比較する必要があります...)で

同等は次のようになります。

dfOut = pd.concat([dfIn, dfIn.idxmax(axis=1).rename('maxCol')], axis=1) 
+1

一般にいくつのカラム数がありますか? – mrsrinivas

+0

私は約60列を持っています – ivankeller

+0

最大列と比較してどれだけ多くのものがありますか? – mrsrinivas

答えて

7

あなたがgreatest機能を使用することができ、小さなトリックで。必要な輸入は:

import org.apache.spark.sql.functions.{col, greatest, lit, struct} 

まず者は、最初の要素が値であるstructs、2つ目のカラム名のリストを作成してみましょう:

次のように greatestに渡すことができ、このような
val structs = dfIn.columns.tail.map(
    c => struct(col(c).as("v"), lit(c).as("k")) 
) 

構造:

dfIn.withColumn("maxCol", greatest(structs: _*).getItem("k")) 
+---+---+---+---+------+ 
| id| c0| c1| c2|maxCol| 
+---+---+---+---+------+ 
| r0| 0| 2| 3| c2| 
| r1| 1| 0| 0| c0| 
| r2| 0| 2| 2| c2| 
+---+---+---+---+------+ 

絆の場合、それはなりますのでご注意くださいシーケンスの後の方で発生する要素を取る(辞書編集的に(x, "c2") > (x, "c1"))。何らかの理由でこれが受け入れられない場合は、明示的whenを減らすことができます。nullable列の場合

import org.apache.spark.sql.functions.when 

val max_col = structs.reduce(
    (c1, c2) => when(c1.getItem("v") >= c2.getItem("v"), c1).otherwise(c2) 
).getItem("k") 

dfIn.withColumn("maxCol", max_col) 
+---+---+---+---+------+ 
| id| c0| c1| c2|maxCol| 
+---+---+---+---+------+ 
| r0| 0| 2| 3| c2| 
| r1| 1| 0| 0| c0| 
| r2| 0| 2| 2| c1| 
+---+---+---+---+------+ 

は、あなたがこれを調整する必要があり、例えばcoalescingによって値に-Infに。

関連する問題