2016-08-01 16 views
0

私はapache sparkを使い始めています。 jsonログをフラット化されたメトリックに変換する必要があり、単純なcsvと見なすこともできます。JSONログからの集計メトリックをapache sparkで作成する

たとえば、

"orderId":1, 
    "orderData": { 
    "customerId": 123, 
    "orders": [ 
    { 
     "itemCount": 2, 
     "items": [ 
     { 
      "quantity": 1, 
      "price": 315 
     }, 
     { 
      "quantity": 2, 
      "price": 300 
     }, 

     ] 
    } 
    ] 
} 

これは、単一のJSONログとして考えることができ、私はにこれを変換したい、

orderId,customerId,totalValue,units 
    1 , 123 , 915 , 3 

私はsparkSQLドキュメントを通過されたと選択」のような個々の値のホールドを得るためにそれを使用することができますorderId、orderData.customerIdから注文 "しかし、私はすべての価格と単位の合計を取得する方法がわかりません。

これをapache sparkを使用して行うにはベストプラクティスが必要ですか?

+0

カントJSONを。 ( "/ path/to/file")。toDF(); df.registerTempTable( "df"); df.printSchema();その後、SQLを介して集計を実行しますか? –

+0

SQLを使って個々の要素を取得できますが、orders.itemsについてはわかりませんが、どうすればこの上で集計を実行できますか?私はそれがjsonの価値としてのみ来ると思う、私が何かを欠けている場合私を修正してください。 – fireants

+0

あなたは[this](http://xinhstechblog.blogspot.in/2015/06/reading-json-data-in-spark-dataframes.html)と[入れ子にされたjson](http:// xinhstechblog .blogspot.in/2016/05/reading-json-nested-array-in-spark.html) –

答えて

1

試してみてください。上記のJavaソリューションを探している人々のために

>>> from pyspark.sql.functions import * 
>>> doc = {"orderData": {"orders": [{"items": [{"quantity": 1, "price": 315}, {"quantity": 2, "price": 300}], "itemCount": 2}], "customerId": 123}, "orderId": 1} 
>>> df = sqlContext.read.json(sc.parallelize([doc])) 
>>> df.select("orderId", "orderData.customerId", explode("orderData.orders").alias("order")) \ 
... .withColumn("item", explode("order.items")) \ 
... .groupBy("orderId", "customerId") \ 
... .agg(sum("item.quantity"), sum(col("item.quantity") * col("item.price"))) 
+0

作業ロジックをありがとう、私はそれをjavaでマップし、他の人に投稿しようとします。 – fireants

0

は、従ってください:私たちはデータフレームDF = sqlContext.read()のように行う

SparkSession spark = SparkSession 
      .builder() 
      .config(conf) 
      .getOrCreate(); 

    SQLContext sqlContext = new SQLContext(spark); 

    Dataset<Row> orders = sqlContext.read().json("order.json"); 
    Dataset<Row> newOrders = orders.select(
      col("orderId"), 
      col("orderData.customerId"), 
      explode(col("orderData.orders")).alias("order")) 
      .withColumn("item",explode(col("order.items"))) 
      .groupBy(col("orderId"),col("customerId")) 
      .agg(sum(col("item.quantity")),sum(col("item.price"))); 
    newOrders.show(); 
関連する問題