2016-09-27 7 views
0

私はこのようになりますスパークDATAFRAME持って:私は何をしたいかDATAFRAME変換

root 
|-- employeeName: string (nullable = true) 
|-- employeeId: string (nullable = true) 
|-- employeeEmail: string (nullable = true) 
|-- company: struct (nullable = true) 
| |-- companyName: string (nullable = true) 
| |-- companyId: string (nullable = true) 
| |-- details: struct (nullable = true) 
| | |-- founded: string (nullable = true) 
| | |-- address: string (nullable = true) 
| | |-- industry: string (nullable = true) 

は、企業IDによるグループであり、このような企業ごとの従業員の配列を取得:

root 
|-- company: struct (nullable = true) 
| |-- companyName: string (nullable = true) 
| |-- companyId: string (nullable = true) 
| |-- details: struct (nullable = true) 
| | |-- founded: string (nullable = true) 
| | |-- address: string (nullable = true) 
| | |-- industry: string (nullable = true) 
|-- employees: array (nullable = true)  
| |-- employee: struct (nullable = true)   
| | |-- employeeName: string (nullable = true) 
| | |-- employeeId: string (nullable = true) 
| | |-- employeeEmail: string (nullable = true) 

もちろん、mapとreduceByKeyを使って、(company、employee):(String、String)のペアを持っていれば簡単にできます。しかし、入れ子にされた情報がすべて異なると、どのアプローチをとるべきかはわかりません。

すべてを平坦化する必要がありますか?同様のことをする例は非常に役に立ちます。あなたは、次の操作を行うことができ

答えて

1

- で

// declaring data types 
case class Company(cName: String, cId: String, details: String) 
case class Employee(name: String, id: String, email: String, company: Company) 

// setting up example data 
val e1 = Employee("n1", "1", "[email protected]", Company("c1", "1", "d1")) 
val e2 = Employee("n2", "2", "[email protected]", Company("c1", "1", "d1")) 
val e3 = Employee("n3", "3", "[email protected]", Company("c1", "1", "d1")) 
val e4 = Employee("n4", "4", "[email protected]", Company("c2", "2", "d2")) 
val e5 = Employee("n5", "5", "[email protected]", Company("c2", "2", "d2")) 
val e6 = Employee("n6", "6", "[email protected]", Company("c2", "2", "d2")) 
val e7 = Employee("n7", "7", "[email protected]", Company("c3", "3", "d3")) 
val e8 = Employee("n8", "8", "[email protected]", Company("c3", "3", "d3")) 
val employees = Seq(e1, e2, e3, e4, e5, e6, e7, e8) 
val ds = sc.parallelize(employees).toDS 

// actual query to achieve what is mentioned in the question 
val result = ds.groupByKey(e => e.company).mapGroups((k, itr) => (k, itr.toList)) 
result.collect 

結果:

Array(

(Company(c1,1,d1),WrappedArray(Employee(n1,1,[email protected],Company(c1,1,d1)), Employee(n2,2,[email protected],Company(c1,1,d1)), Employee(n3,3,[email protected],Company(c1,1,d1)))), 

(Company(c2,2,d2),WrappedArray(Employee(n4,4,[email protected],Company(c2,2,d2)), Employee(n5,5,[email protected],Company(c2,2,d2)), Employee(n6,6,[email protected],Company(c2,2,d2)))), 

(Company(c3,3,d3),WrappedArray(Employee(n7,7,[email protected],Company(c3,3,d3)), Employee(n8,8,[email protected],Company(c3,3,d3))))) 

重要なことがある:あなたがそのようにグループを取得するためにmapGroupsにしたい任意の関数を渡すことができますあなたは欲しい。

これが役に立ちます。

+0

ありがとう、私はそれを同様の方法で解決することができました。 – Dmitri

関連する問題