2016-04-11 7 views
1

JSONにデータフレーム内のいくつかのフィールドを書き込もうとしています。データフレームの私のデータ構造はJSONにフィールドセットを書き込む方法は?

Key|col1|col2|col3|col4 
key|a |b |c |d 
Key|a1 |b1 |c1 |d1 

今私は、JSONにCOL4フィールドにちょうどCOL1を変換し、JSONフィールドに

期待出力

[Key,{cols:[{col1:a,col2:b,col3:c,col4:d},{col1:a1,col2:b1,col3:c1,col4:d1}] 

を名前を付けしようとしていますされます私はこれのためにudfを書いた。あなたが見ることができるように

val summary = udf( 
(col1:String, col2:String, col3:String, col4:String) => "{\"cols\":[" + " {\"col1\":" + col1 + ",\"col2\":" + col2 + ",\"col3\":" + col3 + ",\"col4\":" + col4 + "}]}" 
) 

val result = input.withColumn("Summary",summary('col1,'col2,'col3,'col4)) 
val result1 = result.select('Key,'Summary) 
result1.show(10) 

これは

[Key,{cols:[{col1:a,col2:b,col3:c,col4:d}]}] 
[Key,{cols:[{col1:a1,col2:b1,col3:c1,col4:d1}]}] 

私の結果であり、それらがグループ化されていません。 UDF自体を使用してこれらの行をグループ化する方法はありますか?私はscala/Sparkを初めて使い、適切なudfを理解することができません。

+0

私はあなたが正しくあなたの「予想出力」を終了しましたとは思いません。私は、オープニング "[{"と一致するように最後に "}]"があることを期待しています。 –

答えて

1

UDFは1つの行を1つの行にマッピングします。 DataFrameに複数の行を1つの要素に結合する場合は、reduceByKeyのような複数の行を集約する関数を使用する必要があります。

はこれを行うにはDataFrame特定の機能があるかもしれませんが、私はそうのように、RDD機能で、この処理を行うだろう:

val colSummary = udf( 
(col1:String, col2:String, col3:String, col4:String) => "{\"col1\":" + col1 + ",\"col2\":" + col2 + ",\"col3\":" + col3 + ",\"col4\":" + col4 + "}" 
) 
val colRDD = input.withColumn("Summary",summary('col1,'col2,'col3,'col4)).rdd.map(x => (x.getString(0),x.getString(5))) 

これは私たちに私たちがPairRDDFunctionsを使用できるようになりますRDD[(String,String)]を与えますreduceByKeydocsを参照)のように。タプルのキーは元のキーです。この値は、colsリストを作成するために集約する必要のある単一要素のjsonエンコーディングです。カンマで区切ったリストにすべてを貼り付けてから、開始と終了を追加して、それで終了です。

val result = colRDD.reduceByKey((x,y) => (x+","+y)).map(x => "["+x._1+",{\"cols\":["+x._2+"]}]") 
result.take(10) 
+1

ありがとうございました。ちょっとした編集(このコメントで説明)を答えてください。 val colRDDには最後に ')'がありません。また結果には最後に「)」がありません。 colRDDでは、x.getString(1)の代わりにx.getString(5)が使用されます。これは、入力データフレームにフィールドを追加した後、Summaryフィールドが5番目のフィールドになるためです。最後にresult.take(10)は 'result'がrddです。 – dheee

2
// Create your dataset 
scala> val ds = Seq((1, "hello", 1L), (2, "world", 2L)).toDF("id", "token", "long") 
ds: org.apache.spark.sql.DataFrame = [id: int, token: string ... 1 more field] 

// select the fields you want to map to json 
scala> ds.select('token, 'long).write.json("your-json") 

// check the result 
➜ spark git:(master) ✗ ls -ltr your-json/ 
total 16 
-rw-r--r-- 1 jacek staff 27 11 kwi 17:18 part-r-00007-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00006-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00005-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00004-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 27 11 kwi 17:18 part-r-00003-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00002-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00001-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00000-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 _SUCCESS 
➜ spark git:(master) ✗ cat your-json/part-r-00003-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
{"token":"hello","long":1} 
➜ spark git:(master) ✗ cat your-json/part-r-00007-91f81f62-54bb-42ae-bddc-33829a0e3c16.json 
{"token":"world","long":2} 
関連する問題