2016-10-18 1 views
0

カウントの個々の列を取得してメトリクスを公開しようとしています。マップをフィルタリングする方法データフレーム内:Spark/Scala

val totalCustomers = df.count 

val totalPurchaseCount = df.filter("totalPurchase > 0").count 

val totalRentCount = df.filter("totalRent > 0").count 


publishMetrics("Total Customer", totalCustomers) 
publishMetrics("Total Purchase", totalPurchaseCount) 
publishMetrics("Total Rent", totalRentCount) 

publishMetrics("Percentage of Rent", percentage(totalRentCount, totalCustomers)) 
publishMetrics("Percentage of Purchase", percentage(totalPurchaseCount, totalCustomers)) 

private def percentageCalc(num: Long, denom: Long): Double = { 
val numD: Long = num 
val denomD: Long = denom 
return if (denomD == 0.0) 0.0 
else (numD/denomD) * 100 
} 

しかし、私はマップであるitemTypeCountsのためにこれを行うのですかわからない:私は今、私がやっているdf [customerId : string, totalRent : bigint, totalPurchase: bigint, itemTypeCounts: map<string, int> ]

を持っています。私は、各キー入力に基づいてカウントとパーセンテージを求めます。問題は、キー値が動的であることです。私は手前でキー値を知る方法がないことを意味します。どのキーがどのように各キー値のカウントを取得するか教えてくれますか?私はスカラ/スパーク、新しい各カラムの数を得るための他の効率的なアプローチは非常に感謝しています。

サンプルデータ:

customerId : 1 
totalPurchase : 17 
totalRent : 0 
itemTypeCounts : {"TV" : 4, "Blender" : 2} 

customerId : 2 
totalPurchase : 1 
totalRent : 1 
itemTypeCounts : {"Cloths" : 4} 

customerId : 3 
totalPurchase : 0 
totalRent : 10 
itemTypeCounts : {"TV" : 4} 

ので、出力は次のとおりです。

totalCustomer : 3 
totalPurchaseCount : 2 (2 customers with totalPurchase > 0) 
totalRent : 2 (2 customers with totalRent > 0) 
itemTypeCounts_TV : 2 
itemTypeCounts_Cloths : 1 
itemTypeCounts_Blender : 1 
+0

あなたがサンプルの入力データと所望の出力を提供してもらえますか? – LiMuBei

+0

@LiMuBeiサンプルデータを更新しました。 – Newbie

答えて

0

私は自分自身スパーク初心者ですので、これを行うには良い方法はおそらくあります。しかし、あなたが試みることのできる1つの方法は、itemTypeCountsを、作業できるスカラのデータ構造に変換することです。私は各行を(Name, Count)のペアのリストに変換しました。 List((Blender,2), (TV,4))

これで、このようなペアのリスト、各行のペアのリストを作成できます。あなたの例では、これは3つの要素のリストになります:あなたはこのような構造を持っていたら

List(
    List((Blender,2), (TV,4)), 
    List((Cloths,4)), 
    List((TV,4)) 
) 

、所望の出力にそれを変換することは、標準のスカラ座です。

働いた例は以下の通りです:

val itemTypeCounts = df.select("itemTypeCounts") 

//Build List of List of Pairs as suggested above 
val itemsList = itemTypeCounts.collect().map { 
    row => 
    val values = row.getStruct(0).mkString("",",","").split(",") 
    val fields = row.schema.head.dataType.asInstanceOf[StructType].map(s => s.name).toList 
    fields.zip(values).filter(p => p._2 != "null") 
}.toList 

// Build a summary map for the list constructed above 
def itemTypeCountsSummary(frames: List[List[(String, String)]], summary: Map[String, Int]) : Map[String, Int] = frames match { 
    case Nil => summary 
    case _ => itemTypeCountsSummary(frames.tail, merge(frames.head, summary)) 
} 

//helper method for the summary map. 
def merge(head: List[(String, String)], summary: Map[String, Int]): Map[String, Int] = { 
    val headMap = head.toMap.map(e => ("itemTypeCounts_" + e._1, 1)) 
    val updatedSummary = summary.map{e => if(headMap.contains(e._1)) (e._1, e._2 + 1) else e} 
    updatedSummary ++ headMap.filter(e => !updatedSummary.contains(e._1)) 
} 

val summaryMap = itemTypeCountsSummary(itemsList, Map()) 

summaryMap.foreach(e => println(e._1 + ": " + e._2)) 

出力:

itemTypeCounts_Blender: 1 
itemTypeCounts_TV: 2 
itemTypeCounts_Cloths: 1 
関連する問題