を使用して参加する2 RDDs
は、私が持っている2 RDDSはScalaの
最初の1 (productID,category)
秒1 (customerID,productID,quantity)
どのように私は出力が(customerID,category,quantity)
のように見えることができますか?
ロジックは、第rdd
のcategory
に対応する第二rdd
の各productID
を交換することです。
私は解決策のためのScalaを使用したい
を使用して参加する2 RDDs
は、私が持っている2 RDDSはScalaの
最初の1 (productID,category)
秒1 (customerID,productID,quantity)
どのように私は出力が(customerID,category,quantity)
のように見えることができますか?
ロジックは、第rdd
のcategory
に対応する第二rdd
の各productID
を交換することです。
私は解決策のためのScalaを使用したい
あなたが持っていると思われるための2つのcase classes
あなたの2 rdds
case class Products(productId:String, category:String)
case class Customer(customerId:String, productId:String, quantity:Int)
次の2つのrdds
val rdd1 = sc.parallelize(Seq(
Products("product1", "category1"),
Products("product2", "category2"),
Products("product3", "category3"),
Products("product4", "category4")
))
val rdd2 = sc.parallelize(Seq(
Customer("customer1", "product1", 5),
Customer("customer1", "product2", 6),
Customer("customer2", "product3", 2),
Customer("customer2", "product4", 9)
))
として、あなたは、単にできてい
join
2つの
rdds
がproductIdになっていますが、それらに参加する前にcreatする必要があります商品IDをキーにした
pairRDD
のed。
rdd1.map(prod => (prod.productId, prod))
rdd2.map(customer => (customer.productId, customer))
最後のステップは単純なjoin
であり、必要な値を選択します。
rdd1.join(rdd2).map(x => (x._2._2.customerId, x._2._1.category, x._2._2.quantity))
私は、これはこれは、人々はあなたがサンプルデータと一緒に試してみたものを示すことによって、あなたを助けるために役立つだろう
ありがとうございます... ここで 'customer'と' products'は1つの値のペアです このようにして、または 'mapValues(x => x(0)、...)でクラスの値にアクセスすることはできません) ' –
' customer'と 'products'は一つの値ではありません。彼らは 'Tuple2'です。もしそれらが '配列'として結合されていれば、 'x(0)'を行うことができますが、それらは 'タプル'にありますので、正しい値を得る方法は '_1 .....'を使うことです。 ''それは余分な作業になります。あなたはまだそれが明らかでない場合は、より多くを求めることができます。 :) –
に役立ちます願っています。私が正しくあなたの質問を理解していればとにかく、読みやすいアプローチは、最初の製品IDで、その後、データフレームにあなたのRDDSを変換し、それらに参加するだろう、次のように:スタックオーバーフローへ
val rddProduct = sc.parallelize(Seq(
(101, "cat1"),
(102, "cat1"),
(103, "cat2"),
(104, "cat3")
))
// Convert 1st RDD to DataFrame
val dfProduct = rddProduct.toDF("productId", "category")
val rddOrder = sc.parallelize(Seq(
(1, 101, 10),
(1, 103, 5),
(2, 101, 15),
(2, 102, 10),
(2, 103, 10),
(3, 101, 15),
(3, 102, 5),
(3, 104, 5)
))
// Convert 2nd RDD to DataFrame
val dfOrder = rddOrder.toDF("customerId", "productId", "quantity")
// Join dataframes by prodId
val dfResult = dfOrder.as("o").join(
dfProduct.as("p"), $"o.productId" === $"p.productId", "inner"
).
select($"o.customerId", $"p.category", $"o.quantity").
orderBy($"o.customerId", $"p.category", $"o.quantity")
dfResult.show
+----------+--------+--------+
|customerId|category|quantity|
+----------+--------+--------+
| 1| cat1| 10|
| 1| cat2| 5|
| 2| cat1| 10|
| 2| cat1| 15|
| 2| cat2| 10|
| 3| cat1| 5|
| 3| cat1| 15|
| 3| cat3| 5|
+----------+--------+--------+
// Convert back to RDD, if necessary
val rddResult = dfResult.rdd
ようこそ。 [ツアー](https://stackoverflow.com/tour)と[質問](https://stackoverflow.com/help/asking)をご覧ください。 – Shiro