2016-05-07 7 views
6

これは簡単ですが.... Spark 1.6.1を使用しています.... DataFrame#1に列A、B、C 。値を持つ:これまでのところは良い最初のデータフレームの列に基づいて新しい列値を持つ新しいSpark DataFrameを作成するJava

DataFrame df2 = df1.withColumn("D", df1.col("C")); 

を私は実際には、列Dの値は、条件付きすなわちになりたい::

A B C 
1 2 A 
2 2 A 
3 2 B 
4 2 C 

私はそのように新しい列Dで新しいデータフレームを作成

// pseudo code 
if (col C = "A") the col D = "X" 
else if (col C = "B") the col D = "Y" 
else col D = "Z" 

私は列Cをドロップし、Dの名前をCに変更します。私は列関数を見てみましたが、何も手形に合うようには見えません。私はdf1.rdd()。map()を使って行全体を繰り返し処理することを考えましたが、それを実際に動作させないことを除けば、DataFrames全体がRDD抽象化から離れようとしていました。

残念ながら私はJavaでこれを行う必要があります(もちろんSpark with Javaは最適ではありません!!)。私は明らかに不足しているようで、解決策を提示するとばかげていることが示されてうれしいです!

答えて

12

私はあなたがそれを達成するためにwhenを使用することができると信じています。さらに、古い列を直接置き換えることもできます。あなたの例では、コードのようなものになります:whenの詳細については

import static org.apache.spark.sql.functions.*; 

Column newCol = when(col("C").equalTo("A"), "X") 
    .when(col("C").equalTo("B"), "Y") 
    .otherwise("Z"); 

DataFrame df2 = df1.withColumn("C", newCol); 

Column Javadocを確認してください。ダニエル・へ

+1

おかげで、その後、他の構造であれば、単純なを書きます。■ - 私が欠けていたものをSQL関数の静的インポートは、IEた:インポート静的org.apacheを。 spark.sql.functions。* – user1128482

+0

@ user1128482申し訳ありませんが、私はインポートを忘れました。あなたが最後に見つけたことを知ってよかった。 –

2

おかげで不足している部分がSQL関数

import static org.apache.spark.sql.functions.*; 

の静的インポートだった私はこれを解決してきた:)

私は時に使用するの百万さまざまな方法を試している必要がありますが、コンパイルました私がインポートをしなかったので、失敗/ランタイムエラーが発生しました。一度インポートされたダニエルの答えにスポット!

1

udfを使って同じ仕事をすることもできます。私が実際に顔に明白なことを見つめていた - ちょうどこのため

import org.apache.spark.sql.functions.udf 
val customFunct = udf { d => 
     //if then else construct 
    } 

val new_DF= df.withColumn(column_name, customFunct(df("data_column"))) 
関連する問題