2017-07-10 12 views
-3

を注文収集し、私は以下のようにデータフレームを持つデータフレームの列

A B C 
1 3 1 
1 8 2 
1 5 3 
2 2 1 

私の出力は、列Bが、私はこのような何かを書いた最初の列Bの値

A B 
1 3,1/5,3/8,2 
2 2,1 

に基づいて順序付けされるべきですスカラーです

df.groupBy("A").withColumn("B",collect_list(concat("B",lit(","),"C")) 

しかし、不具合が私の問題を解決します。

+0

あなたの質問を編集して、データとコードの正しい形式を表示してください。これを強調表示してCtrl + Kを押すことでこれを行うことができます – WhatsThePoint

+0

B列はどのように派生していますか? - その不明確 – jsdeveloper

+0

列ごとのグループA – Deek

答えて

1

あなたはあなたが簡単なgroupByaggregationsを行うと、あなたが使用することができますfunctions

df.orderBy("B").groupBy("A").agg(collect_list(concat_ws(",", col("B"), col("C"))) as "B") 

を使用することにより

+---+---------------+ 
|A |B    | 
+---+---------------+ 
|1 |[3,1, 5,3, 8,2]| 
|2 |[2,1]   | 
+---+---------------+ 

として次のような出力を得ることができます

+---+---+---+ 
|A |B |C | 
+---+---+---+ 
|1 |3 |1 | 
|1 |8 |2 | 
|1 |5 |3 | 
|2 |2 |1 | 
+---+---+---+ 

ように、入力データフレームを持っていることを考えるとudf関数t O

def joinString = udf((b: mutable.WrappedArray[String]) => { 
    b.mkString("/") 
}) 

newdf.withColumn("B", joinString(col("B"))).show(false) 

として最終的に望ましい結果を得るあなたが

+---+-----------+ 
|A |B   | 
+---+-----------+ 
|1 |3,1/5,3/8,2| 
|2 |2,1  | 
+---+-----------+ 

あなたは

編集

を動作するように上記のすべてのための import org.apache.spark.sql.functions._が必要になりますを取得する必要があります

列Bが最初の列Bの値に基づいて、このために

を発注しているあなただけ

import org.apache.spark.sql.functions._ 
val newdf = df.groupBy("A").agg(collect_list(concat_ws(",", col("B"), col("C"))) as "B") 

def joinString = udf((b: mutable.WrappedArray[String]) => { 
    b.mkString("/") 
}) 

newdf.withColumn("B", joinString(col("B"))).show(false) 

としてORDERBYの一部を削除することができますし、

+---+-----------+ 
|A |B   | 
+---+-----------+ 
|1 |3,1/8,2/5,3| 
|2 |2,1  | 
+---+-----------+ 
+0

しかし、これは私に注文を与えることはありません – Deek

+0

それはあなたが見ることができるように既に配列で注文されています。 –

+0

これは、withcolumnがrelationalgroupeddatasetのメンバーではないというエラーをスローします – Deek

0
として出力を取得する必要があります

これは、concat_ws関数を使用してグループAの列Aを使用してリストを収集することで実現できます。

val df1 = spark.sparkContext.parallelize(Seq(
    (1, 3, 1), 
    (1, 8, 2), 
    (1, 5, 3), 
    (2, 2, 1) 
)).toDF("A", "B", "C") 

val result = df1.withColumn("B", concat_ws("/", $"B", $"C")) 

result.groupBy("A").agg(collect_list($"B").alias("B")).show 

出力:

+---+---------------+ 
| A|    B| 
+---+---------------+ 
| 1|[3/1, 8/2, 5/3]| 
| 2|   [2/1]| 
+---+---------------+ 

編集:

+---+-----------+ 
| A|   B| 
+---+-----------+ 
| 1|3,1/5,3/8,2| 
| 2|  2,1| 
+---+-----------+ 
:ここ
あなたは、列B

val format = udf((value : Seq[String]) => { 
    value.sortBy(x => {x.split(",")(0)}).mkString("/") 
}) 


val result = df1.withColumn("B", concat_ws(",", $"B", $"C")) 
    .groupBy($"A").agg(collect_list($"B").alias("B")) 
    .withColumn("B", format($"B")) 

result.show() 

出力をソートする場合は、あなたが何ができるかであります

希望これは役に立ちました!

+0

私の出力は基本的に[3,1/8,2/5,3]になるはずです – Deek

+0

後で必要に応じてソートする必要がある場合は、グループの後に注文を取得します。上記のコードは、これが役立つように更新されました –