-2
私はすべてのjsons(top/total)で各placeNameのパーセンタイルを見つけることができるように、Total(総数)とTop Elements(フィルタ後の数)を取得しようとしています。評価> 3:外部結合を行う方法:Spark Scala SQLContext
// sc : An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("temp.txt")
//df.show()
val res = df.withColumn("visited", explode($"visited"))
val result = res.groupBy($"customerId", $"visited.placeName")
Tried with joins :
val result1 =res.groupBy($"customerId", $"visited.placeName").agg(count("*").alias("total"))
val result2 = res
.filter($"visited.rating" < 4)
.groupBy($"requestId", $"visited.placeName")
.agg(count("*").alias("top"))
result1.show()
result2.show()
percentile = result1.join(result2, List("placeName","customerId"), "outer")
sqlContext.sql("select top/total as percentile from temp groupBy placeName")
ただし、私にエラーが発生します。
val result1 = result.withColumn("Top", getCount(res , true))
.withColumn("Total",getCount(result, false)).show()
def getCount(df: DataFrame, flag: Boolean): Int {
if (flag == "true") return df.filter($"visited.rating" < 3).groupBy($"customerId", $"visited.placeName").agg(count("*"))
else return df.agg(count("*"))
}
マイスキーマ:
は、私のようなUDF何かでこれを行うことができます
{
"country": "France",
"customerId": "France001",
"visited": [
{
"placeName": "US",
"rating": "2",
"famousRest": "N/A",
"placeId": "AVBS34"
},
{
"placeName": "US",
"rating": "3",
"famousRest": "SeriousPie",
"placeId": "VBSs34"
},
{
"placeName": "Canada",
"rating": "3",
"famousRest": "TimHortons",
"placeId": "AVBv4d"
}
]
}
US top = 1 count = 3
Canada top = 1 count = 3
{
"country": "Canada",
"customerId": "Canada012",
"visited": [
{
"placeName": "UK",
"rating": "3",
"famousRest": "N/A",
"placeId": "XSdce2"
},
]
}
UK top = 1 count = 1
{
"country": "France",
"customerId": "France001",
"visited": [
{
"placeName": "US",
"rating": "4.3",
"famousRest": "N/A",
"placeId": "AVBS34"
},
{
"placeName": "US",
"rating": "3.3",
"famousRest": "SeriousPie",
"placeId": "VBSs34"
},
{
"placeName": "Canada",
"rating": "4.3",
"famousRest": "TimHortons",
"placeId": "AVBv4d"
}
]
}
US top = 2 count = 3
Canada top = 1 count = 3
だから、最後に私のようなものが必要です
PlaceName percentile
US 57.14 (1+1+2)/(3+1+3) *100
Canada 33.33 (1+1)/(3+3) *100
UK 100 1*100
スキーマ:
をroot
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- placeId: string (nullable = true)
| | |-- placeName: string (nullable = true)
| | |-- famousRest: string (nullable = true)
| | |-- rating: string (nullable = true)
私は意図したことを追加しようとしていました。別のクエリを使用して結合を試みました。val res = df.withColumn( "visited"、explode($ "visited")) 'val result1 = res.groupBy($" customerId "、$" visited.placeName ") .gg(count( "*")。エイリアス( "total")) val result2 = res .filter($ "visited.rating" <4) .groupBy($ "requestId"、$ "visited.placeName ") .agg(数(" * ")の別名。(" トップ」)) result1.show() result2.show() ' –
と使用が加わり:'パーセンタイル= result1.join(結果2、リスト( "placeName"、 "customerId")、 "outer")sqlContext.sql( "temp groupBy placeNameからのパーセンタイルとしてトップ/トータルを選択") 'しかし、エラーも発生します。それがなぜpdfを試してみたかったのですか(カウントを追加するために(トップとトータル))。これにアプローチしてパーセンタイルを得る方法を教えてください –