2016-12-05 2 views
2

Sparkの使用私はcsvを読み込み、csvの列に関数を適用したいと考えています。私は動作するいくつかのコードがありますが、それは非常にハッキーです。これを行う適切な方法は何ですか?Sparkのcsvの単一の列に関数を適用する

私のコード私はちょうどlineに各行をマッピングして、line[index]上の関数を呼び出すのではなく、列名の関数を呼び出すことができるようにしたいと思い

SparkContext().addPyFile("myfile.py") 
spark = SparkSession\ 
    .builder\ 
    .appName("myApp")\ 
    .getOrCreate() 
from myfile import myFunction 

df = spark.read.csv(sys.argv[1], header=True, 
    mode="DROPMALFORMED",) 
a = df.rdd.map(lambda line: Row(id=line[0], user_id=line[1], message_id=line[2], message=myFunction(line[3]))).toDF() 

私はあなたが単にwithColumnと組み合わせるユーザー定義関数(udf)を使用することができスパークバージョン2.0.1

答えて

7

を使用しています:

from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 

udf_myFunction = udf(myFunction, IntegerType()) # if the function returns an int 
df.withColumn("message", udf_myFunction("_3")) #"_3" being the column name of the column you want to consider 

これが含むデータフレームdfに新しい列が追加されますmyFunction(line[3])の結果です。

+0

素晴らしいです、ありがとう、 'udf'が存在するかどうかはわかりませんでした。スーパーヘルプ。 – Sal

関連する問題