2017-09-11 24 views
1

これは、私の入力データがどのように見えるかですスパークScalaのCSV入力

20170101,2024270,1000,1000,1000,1000,1000,1000,1000,2000,2000 
20170101,2024333,1000,1000,1000,1000,1000,1000,1000,2000,2000 
20170101,2023709,1000,1000,1000,1000,1000,1000,1000,2000,2000 
20170201,1234709,1000,1000,1000,1000,1000,1000,1000,2000,2000 

と私はキーが整数であり、値はJSONオブジェクトであるkeyValueのRDDに同じことを変換すると、目的は、私は、コードの以下の部分を使用して同じことを行うことができるよPythonで

( 
2024270, { 
"metrics": { 
    "date" : 20170201, 
    "style_id" : 1234709, 
    "revenue" : 1000, 
    "list_count" : 1000, 
    "pdp_count" : 1000, 
    "add_to_cart_count" : 1000 
} 
} 
) 

をElasticSearchために同じを書くことです

metrics_rdd = sc.textFile('s3://myntra/scm-inbound/fifa/poc/size_curve_date_range_old/*').map(format_metrics) 


def format_metrics(line): 
    tokens = line.split('^') 
    try: 
     return (tokens[1], { 
        'metrics': { 
         'date': tokens[0], 
         'mrp': float(tokens[2]), 
         'revenue': float(tokens[3]), 
         'quantity': int(tokens[4]), 
         'product_discount': float(tokens[5]), 
         'coupon_discount': float(tokens[6]), 
         'total_discount': float(tokens[7]), 
         'list_count': int(tokens[8]), 
         'add_to_cart_count': int(tokens[9]), 
         'pdp_count': int(tokens[10]) 
        } 
       }) if len(tokens) > 1 else ('', dict()) 

しかし、Scalaで同じことを達成する方法を理解できず、Scalaの初心者でも、以下の出力を得ることはできましたが、JSONを「メトリック」ブロックにラップすることはできませんでした。役に立った?

ordersDF.withColumn("key", $"style_id") 
     .withColumn("json", to_json(struct($"date", $"style_id", $"mrp"))) 
     .select("key", "json") 
     .show(false) 

// Exiting paste mode, now interpreting. 

+-------+-------------------------------------------------+ 
|key |json            | 
+-------+-------------------------------------------------+ 
|2024270|{"date":20170101,"style_id":2024270,"mrp":1000.0}| 
|2024333|{"date":20170101,"style_id":2024333,"mrp":1000.0}| 
|2023709|{"date":20170101,"style_id":2023709,"mrp":1000.0}| 
|1234709|{"date":20170201,"style_id":1234709,"mrp":1000.0}| 
+-------+-------------------------------------------------+ 
+0

は、キー/値のRDDにそれを変換したいです)また、json形式のデータを格納し、次に出力をES –

+1

に書き込みます。 'json'列を' metrics'として名前を変更し、結果のDataframeで 'toJSON'を呼び出します。 – philantrovert

+0

ありがとう@philantrovertそれが働いた –

答えて

1

@philantrovertが示唆したことを試してみました。

scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv") 
ordersDF: org.apache.spark.sql.DataFrame = [date: int, style_id: int ... 9 more fields] 

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

ordersDF.withColumn("key", $"style_id") 
     .withColumn("metrics", to_json(struct($"date", $"style_id", $"mrp"))) 
     .select("key", "metrics") 
     .toJSON 
     .show(false) 

// Exiting paste mode, now interpreting. 

+-----------------------------------------------------------------------------------+ 
|value                    | 
+-----------------------------------------------------------------------------------+ 
|{"key":2024270,"metrics":"{\"date\":20170101,\"style_id\":2024270,\"mrp\":1000.0}"}| 
|{"key":2024333,"metrics":"{\"date\":20170101,\"style_id\":2024333,\"mrp\":1000.0}"}| 
|{"key":2023709,"metrics":"{\"date\":20170101,\"style_id\":2023709,\"mrp\":1000.0}"}| 
|{"key":1234709,"metrics":"{\"date\":20170201,\"style_id\":1234709,\"mrp\":1000.0}"}| 
+-----------------------------------------------------------------------------------+ 

私はJson4sライブラリを使用して他の方法でも試してみましたが、それも働いていた、私は他のRDD(次元の情報でそれに参加したいので

def convertRowToJSON(row: Row) = { 

    val json = 
    ("metrics" -> 
     ("date" -> row(1).toString) ~ 
     ("style_id" -> row.getInt(1)) ~ 
     ("mrp" -> row.getFloat(2)) ~ 
     ("revenue" -> row.getFloat(3)) ~ 
     ("quantity" -> row.getInt(1)) ~ 
     ("product_discount" -> row.getFloat(3)) ~ 
     ("coupon_discount" -> row.getFloat(3)) ~ 
     ("total_discount" -> row.getFloat(3)) ~ 
     ("list_count" -> row.getInt(1)) ~ 
     ("add_to_cart_count" -> row.getInt(1)) ~ 
     ("pdp_count" -> row.getInt(1)) 
    ) 
    (row.getInt(1),compact(render(json)).toString) 
} 

scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv").map(convertRowToJSON) 
ordersDF: org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string] 

scala> ordersDF.show(false) 
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 
|_1  |_2                                                            | 
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 
|2024270|{"metrics":{"date":"2024270","style_id":2024270,"mrp":1000.0,"revenue":1000.0,"quantity":2024270,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024270,"add_to_cart_count":2024270,"pdp_count":2024270}}| 
|2024333|{"metrics":{"date":"2024333","style_id":2024333,"mrp":1000.0,"revenue":1000.0,"quantity":2024333,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024333,"add_to_cart_count":2024333,"pdp_count":2024333}}| 
|2023709|{"metrics":{"date":"2023709","style_id":2023709,"mrp":1000.0,"revenue":1000.0,"quantity":2023709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2023709,"add_to_cart_count":2023709,"pdp_count":2023709}}| 
|1234709|{"metrics":{"date":"1234709","style_id":1234709,"mrp":1000.0,"revenue":1000.0,"quantity":1234709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":1234709,"add_to_cart_count":1234709,"pdp_count":1234709}}| 
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 
+0

'json4s'もかなり良いです。いい答え。あなた自身の答えを受け入れ、この質問を閉じることができます。 – philantrovert