2017-12-16 7 views
1

scalaとSparkのgroupByKeyにいくつかの問題があります。 私は2ケースクラスがあります。groupByKeyで値を追加

case class Employee(id_employee: Long, name_emp: String, salary: String) 

私は、この第二ケースクラスを使用した瞬間のために:

case class Company(id_company: Long, employee:Seq[Employee]) 

しかし、私はこの新しいものと交換したい:

case class Company(id_company: Long, name_comp: String employee:Seq[Employee]) 

を親データセット(df1)は、groupByKeyと一緒に使用して、Companyオブジェクトを作成します。

このコードが動作
val companies = df1.groupByKey(v => v.id_company) 
.mapGroups(
    { 
    case(k,iter) => Company(k, iter.map(x => Employee(x.id_employee, x.name_emp, x.salary)).toSeq) 
    } 
).collect() 

、それはこの1のようなオブジェクトを返します。

Company(1234,List(Employee(0987, John, 30000),Employee(4567, Bob, 50000))) 

しかし、私は(このフィールドはDF1存在する)これらのオブジェクトに会社name_compを追加するためのヒントを見つけることができません。 (新しいケースクラスを使用して)このようなオブジェクトを取得するためには:

あなたは会社のIDと名前の両方をしたいので
Company(1234, NYTimes, List(Employee(0987, John, 30000),Employee(4567, Bob, 50000))) 
+0

は、あなたのケースクラス – Tanjin

+0

のいずれかのフィールド/属性として 'name_comp'が表示されない私はこのケースクラスを使用する必要がありますが、これを次のように置き換えてください: case class Company(id_company:Long、namp_comp:String employee:Seq [Employee]) –

+1

'df1'の型を指定できますか? 'Dataset [Company]'のように思えるかもしれませんが、それはコードがコンパイルされるべきではないということを意味します...それを明確にすることは助けになります。 – Tanjin

答えて

2

は、何を行うことができますが鍵は、グループデータとしてタプルを使用することです。 Companyクラスを構築するとき、これは、両方の値を簡単に利用できるようになります:私は一瞬のために私のポストを:)更新@Tanjin

df1.groupByKey(v => (v.id_company, v.name_comp)) 
    .mapGroups{ case((id, name), iter) => 
    Company(id, name, iter.map(x => Employee(x.id_employee, x.name_emp, x.salary)).toSeq)} 
    .collect() 
+0

ありがとうございます!それは完全に動作します!あなたは私の週を救った.End:D –

+0

@PierreJones:問題はなく、うまくいきました。 :) – Shaido