2016-10-10 7 views
2

から最初と最後の日付を選択します私は薬剤が下ケースクラスであるフォームのグループ化RDD(patientIDを、[薬物])、持ってグループ化されたRDD

val grpMeds = medication.groupBy(_.patientID) 

ここで、投薬はRDD [投薬]という形式のRDDです。

各患者について、最も早い日付と最新の日付を検索しようとしています。特定の種類の薬「medicine_A」が投与されました(薬はcase class Medicationの方法です)。私が入手しようとしているのは、RDD [patientID、earliestDate、latestDate]の形式のRDDですが、これを取得する方法を理解することはできません。

ご協力いただければ幸いです。データは以下のようになります(grpMeds.take(0).foreach(println)から取得)。 groupByを使用して

Medication(000961291-01,Tue Jun 21 19:45:00 UTC 2005,Isotonic Saline (0.9%)) 
Medication(000096430-01,Mon Nov 15 20:45:00 UTC 2010,insulin aspart) 
+0

だから何が '分を使用して間違っています'と' '最大'?サンプルデータと期待される結果は役に立ちます。 – sgeddes

+0

日付はjava.utils.Date関数です。私はそれが最小/最大メソッドを持っているとは思わないが、私はdate1before(date2)を使用することができます。 'grpMeds.take(0).foreach(println)'が返すものの例を追加しました。 – mongolol

答えて

3

があり得ることは非常に非効率的な方法です。代わりにSpark SQLまたはreduceByKeyを使用することをお勧めします。

あなたが DataFramemedicationを変換する必要がありスパークSQLの場合

import spark.implicits._ // import sqlContext.implicits._ 

val medicationDF = medication.toDF 

agg続いgroupByを使用します。

medicationDF.groupBy($"patientID", $"medicine").agg(min($"date"), max($"date")) 

このソリューションdateためjava.sql.Dateまたはjava.sql.Timestampでなければなりません。 reduceByKeyについては

まずあなたが重複dateあるpatientIdmedicineから構成キーと値を取得するためにmedicationを再構築する必要があります

val medicationPairs = medication.map(m => 
    ((m.patientID, m.medicine), (m.date, m.date)) 
) 

reduceByKey

medicationPairs.reduceByKey { 
    case ((xMin, xMax), (yMin, yMax)) => (
    if(xMin.before(yMin)) xMin else yMin, 
    if(xMax.after(yMax)) xMax else yMax 
) 
} 
関連する問題