6
私はRDD [行]:Scala:String値のGroupByの合計を取得するには?
|---itemId----|----Country-------|---Type----------|
| 11 | US | Movie |
| 11 | US | TV |
| 101 | France | Movie |
各行は別々のJSONオブジェクトは(RDDの各行)ここでIは、JSONのリストとして結果を保存することができるように、GROUPBYのitemIdを行う方法:
{"itemId" : 11,
"Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} },
{"itemId" : 101,
"Country": {"France" :1 },"Type": {"Movie" :1} }
RDD:
は、私が試した:
import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo
val mappingPath = "s3://.../"
val input = sc.textFile(mappingPath)
入力のリストですいくつかのいずれかが、私はどのように私はこれを達成することができます知らせることができ
val MappingsList = input.map(x=> {
val countryInfo = MappingUtils.getCountryInfoString(x);
(countryInfo.getItemId(), countryInfo)
}).collectAsMap
MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo]
def showCountryInfo(x: Option[CountryInfo]) = x match {
case Some(s) => s
}
val events = sqlContext.sql("select itemId EventList")
val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val çountryInfo = showTitleInfo(MappingsList.get(itemId));
val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
val type = countryInfo.getType()
Row(itemId, country, type)
})
:各行は、私はJSONの解析と変換の世話をするMappingUtilsを使用してPOJOクラスCountryInfoにマッピングしていますJSONでjsons?
ありがとうございました!
RDD [行]はDataFrame/DataSetから来ましたか? RDDを使って作業する[Row]は、まだ実行可能ですが、一般的には理想的ではありません。 –
私はデータセットからRDDを作成しました。 –
@ASpotySpotが私のRDDで更新されました –