2016-10-13 12 views
0

にカンマ区切りで新しい列に入力されたデータのフォーマットを同じ列の値を連結する:方法を次のようにスパーク

+--------------------+-------------+--------------------+ 
|   date  |  user |   product | 
+--------------------+-------------+--------------------+ 
|  2016-10-01 |  Tom |   computer | 
+--------------------+-------------+--------------------+ 
|  2016-10-01 |  Tom |   iphone | 
+--------------------+-------------+--------------------+ 
|  2016-10-01 |  Jhon |    book | 
+--------------------+-------------+--------------------+ 
|  2016-10-02 |  Tom |    pen | 
+--------------------+-------------+--------------------+ 
|  2016-10-02 |  Jhon |    milk | 
+--------------------+-------------+--------------------+ 

及び出力のフォーマットを次のように

+-----------+-----------------------+ 
|  user |  products  | 
+-----------------------------------+ 
|  Tom | computer,iphone,pen | 
+-----------------------------------+ 
|  Jhon |   book,milk | 
+-----------------------------------+ 

出力には、ユーザーが注文したすべての商品が日付順に表示されます。

スパークを使用してこれらのデータを処理したいのですが、助けてください。ありがとうございました。

+2

を持っていません[リストの添付を実行AGGとスパークデータフレームGROUPBY]の可能性のある重複します(http:/ /stackoverflow.com/questions/34202997/spark-dataframe-groupby-with-agg-performing-list-appending) – mtoto

+0

[apache spark dataframeの列を連結]の可能な複製(http://stackoverflow.com/questions/31450846/ concatenate-columns-in-apache-spark-dataframe) –

答えて

1

むしろGROUPBYよりマップreduceBykey()を組み合わせて使用​​することをお勧め..また、データを仮定すると、

#Read the data using val ordersRDD = sc.textFile("/file/path") 
val ordersRDD = sc.parallelize(List(("2016-10-01","Tom","computer"), 
    ("2016-10-01","Tom","iphone"), 
    ("2016-10-01","Jhon","book"), 
    ("2016-10-02","Tom","pen"), 
    ("2016-10-02","Jhon","milk"))) 

#group by (date, user), sort by key & reduce by user & concatenate products 
val dtusrGrpRDD = ordersRDD.map(rec => ((rec._2, rec._1), rec._3)) 
    .sortByKey().map(x=>(x._1._1, x._2)) 
    .reduceByKey((acc, v) => acc+","+v) 

#if needed, make it to DF 
scala> dtusrGrpRDD.toDF("user", "product").show() 
+----+-------------------+ 
|user|   product| 
+----+-------------------+ 
| Tom|computer,iphone,pen| 
|Jhon|   book,milk| 
+----+-------------------+ 
+0

あなたのスパークはどのバージョンですか?ここにはエラーがあります:シンボルsortByKeyを解決できません。 – StrongYoung

+0

大変申し訳ありませんが、今は大丈夫です、どうもありがとうございます。 – StrongYoung

+0

RDDではなくDataFramesを使用するのが一般的です。これは、実行エンジンに最適な実行計画を決定させ、さまざまな最適化(述部のプッシュダウン、コード生成など)を自動的に適用できるためです。 https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html – RyanW

0

あなたは(あなたがする必要があります)HiveContextを使用している場合のpythonを使用して

例:

from pyspark.sql.functions import collect_set 

df = ... load your df ... 
new_df = df.groupBy("user").agg(collect_set("product").alias("products")) 

を使用すると、重複排除製品の結果のリストをしたくない場合は、代わりにcollect_list使用することができます。

関連する問題