これは、私の入力データがどのように見えるかですスパークScalaのCSV入力
20170101,2024270,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2024333,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2023709,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170201,1234709,1000,1000,1000,1000,1000,1000,1000,2000,2000
と私はキーが整数であり、値はJSONオブジェクトであるkeyValueのRDDに同じことを変換すると、目的は、私は、コードの以下の部分を使用して同じことを行うことができるよPythonで
(
2024270, {
"metrics": {
"date" : 20170201,
"style_id" : 1234709,
"revenue" : 1000,
"list_count" : 1000,
"pdp_count" : 1000,
"add_to_cart_count" : 1000
}
}
)
をElasticSearchために同じを書くことです
metrics_rdd = sc.textFile('s3://myntra/scm-inbound/fifa/poc/size_curve_date_range_old/*').map(format_metrics)
def format_metrics(line):
tokens = line.split('^')
try:
return (tokens[1], {
'metrics': {
'date': tokens[0],
'mrp': float(tokens[2]),
'revenue': float(tokens[3]),
'quantity': int(tokens[4]),
'product_discount': float(tokens[5]),
'coupon_discount': float(tokens[6]),
'total_discount': float(tokens[7]),
'list_count': int(tokens[8]),
'add_to_cart_count': int(tokens[9]),
'pdp_count': int(tokens[10])
}
}) if len(tokens) > 1 else ('', dict())
しかし、Scalaで同じことを達成する方法を理解できず、Scalaの初心者でも、以下の出力を得ることはできましたが、JSONを「メトリック」ブロックにラップすることはできませんでした。役に立った?
ordersDF.withColumn("key", $"style_id")
.withColumn("json", to_json(struct($"date", $"style_id", $"mrp")))
.select("key", "json")
.show(false)
// Exiting paste mode, now interpreting.
+-------+-------------------------------------------------+
|key |json |
+-------+-------------------------------------------------+
|2024270|{"date":20170101,"style_id":2024270,"mrp":1000.0}|
|2024333|{"date":20170101,"style_id":2024333,"mrp":1000.0}|
|2023709|{"date":20170101,"style_id":2023709,"mrp":1000.0}|
|1234709|{"date":20170201,"style_id":1234709,"mrp":1000.0}|
+-------+-------------------------------------------------+
は、キー/値のRDDにそれを変換したいです)また、json形式のデータを格納し、次に出力をES –
に書き込みます。 'json'列を' metrics'として名前を変更し、結果のDataframeで 'toJSON'を呼び出します。 – philantrovert
ありがとう@philantrovertそれが働いた –