2016-12-28 14 views
1

私は何百万ものレコードを持つ製品情報ファイルを持っています。 CSVファイルは次のようになります。 sparkを使用して列の内容を置き換える方法

 
    Product CategoryName SalesUnit Other Columns... 
     p1   a12    41
p2 x5 72
p3 x5 69
p4 c21 80
p5 b16 59
p6 x5 75 .. .. ..
そして、次のようにマッピングファイル(CategoryCode < - > CategoryName)があります。マッピングファイルは、約200の記録があります。最終的には
 
    CategoryCode CategoryName
1.0 a12
2.0 b13 3.0 b16 4.0 c12
5.0 c21
6.0 x5
.. ..
を、私はCategoryCodeで区分名の値を置き換えたい:によってずに交換を達成するために
 
    udf { (CategoryName: String) => 
     if (CategoryName.trim() == "a12") 1.0 
     else if (CategoryName.trim() == "b13") 2.0 
     else if (CategoryName.trim() == "b16") 3.0 
     else if (CategoryName.trim() == "c12") 4.0 
     else if (CategoryName.trim() == "c21") 5.0 
     else if (CategoryName.trim() == "x5") 6.0 
     else if (CategoryName.trim() == "z12") 7.0 
     else if (...) ... 
     ... ... 
     else 999.0 
    } 
その他のエレガントなアプローチ:
 
    Product Category SalesUnit Other Colulmns.. 
    p1   1.0   41
p2 6.0 72
p3 6.0 69
p4 5.0 80
p5 3.0 59
p6 6.0 75 .. .. ..
私のアプローチは、スパークデータフレームのUDFを使用することです非常に多くの場合、コーディング... else節?ありがとう。

答えて

3

はトリミングされたカテゴリのCSVでのマッピングファイルに参加し、あなたが

+0

ありがとうございます、あなたのアプローチは私よりも優れています。 :) –

2

を必要とするフィールドだけを選択するあなたはカテゴリ名にDATAFRAMEの両方に参加して、あなたが後でそれを必要とされていないとして区分名自体をドロップすることができます。

あなたはこのような何かを行うことができます。

scala> //Can have more columns , have taken just these columns just to demonstrate 

scala> val df1=sc.parallelize(Seq(("p1","a12",41),("p2","x5",72),("p3","x5",69))).toDF("Product","CategoryName","SalesUnit") 
df1: org.apache.spark.sql.DataFrame = [Product: string, CategoryName: string ... 1 more field] 

scala> //Category code dataFrame 

scala> val df2=sc.parallelize(Seq((1.0,"a12"),(4.0,"c12"),(5.0,"c21"),(6.0,"x5"))).toDF("CategoryCode","CategoryName") 
df2: org.apache.spark.sql.DataFrame = [CategoryCode: double, CategoryName: string] 

scala> val resultDF=df1.join(df2,"CategoryName").withColumnRenamed("CategoryCode","Category").drop("CategoryName") 
resultDF: org.apache.spark.sql.DataFrame = [Product: string, SalesUnit: int ... 1 more field] 

scala> resultDF.show() 
+-------+---------+--------+              
|Product|SalesUnit|Category| 
+-------+---------+--------+ 
|  p1|  41|  1.0| 
|  p2|  72|  6.0| 
|  p3|  69|  6.0| 
+-------+---------+--------+ 

P.S:これは、ほんの少しのデモンストレーションです。

+0

あなたの答えを非常に感謝します。あなたのデモンストレーションは私にとってとても役に立ちます。私はArnon Rotem-Gal-Ozの答えをすぐに返答します。 –

+0

@JeromeLi:そして私はそれを打ち砕いた! –

関連する問題