私は毎週データセットを集約する方法を必要としています。ここに私のデータセットがありますApache Spark |特定の時間枠集約
| date|organization_id|media_package_id|event_uuid |
+----------+---------------+----------------+-----------+
|2016-10-25| 1| 11| 76304d|
|2016-10-25| 1| 11| e6285b|
|2016-10-22| 2| 21| 16c04d|
|2016-10-22| 2| 21| 17804d|
|2016-10-22| 2| 21| 18904x|
|2016-10-21| 2| 21| 51564q|
|2016-10-07| 4| 98| 12874t|
|2016-10-05| 4| 98| 11234d|
+----------+---------------+----------------+-----------+
希望の集計結果を得るためにSparkジョブが毎日実行されているとします。そして、集計後のデータセットの上に、たとえば週単位で結果を欲しいと思います。ここで
| date|organization_id|media_package_id| count|
+----------+---------------+----------------+-----------+
|2016-10-24| 1| 11| 2|
|2016-10-17| 2| 21| 4|
|2016-10-03| 4| 98| 2|
+----------+---------------+----------------+-----------+
は、あなたはそれが私が何とか毎日の集計を行うために管理
(私は最善の方法だと思います)、週の最初の日を取っている日付列を参照してください場合。ここで私はここで
val data = MongoSupport.load(spark, "sampleCollection")
val dataForDates = data.filter(dataForDates("date").isin(dates : _*))
val countByDate = proofEventsForDates.groupBy("DATE", "ORGANIZATION_ID", "MEDIA_PACKAGE_ID")
.agg(count("EVENT_UUID").as("COUNT"))
val finalResult = impressionsByDate
.select(
col("DATE").as("date"),
col("ORGANIZATION_ID").as("organization_id"),
col("MEDIA_PACKAGE_ID").as("media_package_id"),
col("COUNT").as("count")
)
をやった方法です、データセットをフィルタリングするために初めに、私は少なくとも約一ヶ月の日付で構成され、特別なdates
リストを渡しています。そして、私は取得していた結果は、以降、私は、このデータセットの集約を毎週取得に見当もつかない(私が欲しいものではありません)ここで
| date|organization_id|media_package_id| count|
+----------+---------------+----------------+-----------+
|2016-10-25| 1| 11| 2|
|2016-10-22| 2| 21| 3|
|2016-10-21| 2| 21| 1|
|2016-10-07| 2| 21| 1|
|2016-10-05| 2| 21| 1|
+----------+---------------+----------------+-----------+
です。
'organization_id = 5'の行はどうなりましたか? – mtoto
@mtoto質問が編集されました。私はちょっとタイプミスだった – Switch
同じ行の期待される出力に基づいて 'media_package_id'は' 21'、いいえ? – mtoto