2017-06-03 4 views
-1

を使用して参加する2 RDDsは、私が持っている2 RDDSはScalaの

最初の1 (productID,category)

秒1 (customerID,productID,quantity)

どのように私は出力が(customerID,category,quantity)のように見えることができますか?

ロジックは、第rddcategoryに対応する第二rddの各productIDを交換することです。

私は解決策のためのScalaを使用したい

+0

ようこそ。 [ツアー](https://stackoverflow.com/tour)と[質問](https://stackoverflow.com/help/asking)をご覧ください。 – Shiro

答えて

1

あなたが持っていると思われるための2つのcase classesあなたの2 rdds

case class Products(productId:String, category:String) 
case class Customer(customerId:String, productId:String, quantity:Int) 

次の2つのrdds

val rdd1 = sc.parallelize(Seq(
    Products("product1", "category1"), 
    Products("product2", "category2"), 
    Products("product3", "category3"), 
    Products("product4", "category4") 
)) 
val rdd2 = sc.parallelize(Seq(
    Customer("customer1", "product1", 5), 
    Customer("customer1", "product2", 6), 
    Customer("customer2", "product3", 2), 
    Customer("customer2", "product4", 9) 
)) 

として、あなたは、単にできてい join 2つの rddsがproductIdになっていますが、それらに参加する前にcreatする必要があります商品IDをキーにした pairRDDのed。

rdd1.map(prod => (prod.productId, prod)) 
rdd2.map(customer => (customer.productId, customer)) 

最後のステップは単純なjoinであり、必要な値を選択します。

rdd1.join(rdd2).map(x => (x._2._2.customerId, x._2._1.category, x._2._2.quantity)) 

私は、これはこれは、人々はあなたがサンプルデータと一緒に試してみたものを示すことによって、あなたを助けるために役立つだろう

+0

ありがとうございます... ここで 'customer'と' products'は1つの値のペアです このようにして、または 'mapValues(x => x(0)、...)でクラスの値にアクセスすることはできません) ' –

+0

' customer'と 'products'は一つの値ではありません。彼らは 'Tuple2'です。もしそれらが '配列'として結合されていれば、 'x(0)'を行うことができますが、それらは 'タプル'にありますので、正しい値を得る方法は '_1 .....'を使うことです。 ''それは余分な作業になります。あなたはまだそれが明らかでない場合は、より多くを求めることができます。 :) –

0

に役立ちます願っています。私が正しくあなたの質問を理解していればとにかく、読みやすいアプローチは、最初の製品IDで、その後、データフレームにあなたのRDDSを変換し、それらに参加するだろう、次のように:スタックオーバーフローへ

val rddProduct = sc.parallelize(Seq(
    (101, "cat1"), 
    (102, "cat1"), 
    (103, "cat2"), 
    (104, "cat3") 
)) 

// Convert 1st RDD to DataFrame 
val dfProduct = rddProduct.toDF("productId", "category") 

val rddOrder = sc.parallelize(Seq(
    (1, 101, 10), 
    (1, 103, 5), 
    (2, 101, 15), 
    (2, 102, 10), 
    (2, 103, 10), 
    (3, 101, 15), 
    (3, 102, 5), 
    (3, 104, 5) 
)) 

// Convert 2nd RDD to DataFrame 
val dfOrder = rddOrder.toDF("customerId", "productId", "quantity") 

// Join dataframes by prodId 
val dfResult = dfOrder.as("o").join(
    dfProduct.as("p"), $"o.productId" === $"p.productId", "inner" 
). 
    select($"o.customerId", $"p.category", $"o.quantity"). 
    orderBy($"o.customerId", $"p.category", $"o.quantity") 

dfResult.show 
+----------+--------+--------+ 
|customerId|category|quantity| 
+----------+--------+--------+ 
|   1| cat1|  10| 
|   1| cat2|  5| 
|   2| cat1|  10| 
|   2| cat1|  15| 
|   2| cat2|  10| 
|   3| cat1|  5| 
|   3| cat1|  15| 
|   3| cat3|  5| 
+----------+--------+--------+ 

// Convert back to RDD, if necessary 
val rddResult = dfResult.rdd 
関連する問題