2016-09-12 8 views
6

私はRDD [行]:Scala:String値のGroupByの合計を取得するには?

|---itemId----|----Country-------|---Type----------| 
    |  11  |  US   |  Movie  | 
    |  11  |  US   |  TV   | 
    |  101  |  France  |  Movie  |  

各行は別々のJSONオブジェクトは(RDDの各行)ここでIは、JSONのリストとして結果を保存することができるように、GROUPBYのitemIdを行う方法:

{"itemId" : 11, 
"Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} }, 
{"itemId" : 101, 
"Country": {"France" :1 },"Type": {"Movie" :1} } 

RDD:

は、私が試した:

import com.mapping.data.model.MappingUtils 
import com.mapping.data.model.CountryInfo 


val mappingPath = "s3://.../"  
val input = sc.textFile(mappingPath) 

入力のリストですいくつかのいずれかが、私はどのように私はこれを達成することができます知らせることができ

val MappingsList = input.map(x=> { 
        val countryInfo = MappingUtils.getCountryInfoString(x); 
        (countryInfo.getItemId(), countryInfo) 
       }).collectAsMap 

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match { 
     case Some(s) => s 
    } 


val events = sqlContext.sql("select itemId EventList") 

val itemList = events.map(row => { 
    val itemId = row.getAs[String](1); 
    val çountryInfo = showTitleInfo(MappingsList.get(itemId)); 
    val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry() 
    val type = countryInfo.getType() 

    Row(itemId, country, type) 
     }) 

:各行は、私はJSONの解析と変換の世話をするMappingUtilsを使用してPOJOクラスCountryInfoにマッピングしていますJSONでjsons?

ありがとうございました!

+0

RDD [行]はDataFrame/DataSetから来ましたか? RDDを使って作業する[Row]は、まだ実行可能ですが、一般的には理想的ではありません。 –

+0

私はデータセットからRDDを作成しました。 –

+0

@ASpotySpotが私のRDDで更新されました –

答えて

3

私は余分な時間を費やすことはできませんが、あなたには始めることができます。

RDD[Row]をJSON構造を表す単一のマップに集約するという考え方です。集計は2つの関数のパラメータを必要と倍です:

  1. seqOpターゲットタイプのうちの2つをマージする方法ターゲット・タイプへ
  2. combOpの要素のコレクションを折る方法。マージ中にseqOpで見られた値のカウントを蓄積する必要があるよう

トリッキーな部分は、combOpに入っています。私はキャッチする飛行機を持っているので、これを運動として残しました!もしあなたが困っているなら、他の誰かがそのギャップを埋めることができたらいいと思います。

case class Row(id: Int, country: String, tpe: String) 

    def foo: Unit = { 

    val rows: RDD[Row] = ??? 

    def seqOp(acc: Map[Int, (Map[String, Int], Map[String, Int])], r: Row) = { 
     acc.get(r.id) match { 
     case None => acc.updated(r.id, (Map(r.country, 1), Map(r.tpe, 1))) 
     case Some((countries, types)) => 
      val countries_ = countries.updated(r.country, countries.getOrElse(r.country, 0) + 1) 
      val types_ = types.updated(r.tpe, types.getOrElse(r.tpe, 0) + 1) 
      acc.updated(r.id, (countries_, types_)) 
     } 
    } 

    val z = Map.empty[Int, (Map[String, Int], Map[String, Int])] 

    def combOp(l: Map[Int, (Map[String, Int], Map[String, Int])], r: Map[Int, (Map[String, Int], Map[String, Int])]) = { 
     l.foldLeft(z) { case (acc, (id, (countries, types))) => 
      r.get(id) match { 
      case None => acc.updated(id, (countries, types)) 
      case Some(otherCountries, otherTypes) => 
       // todo - continue by merging countries with otherCountries 
       // and types with otherTypes, then update acc 
      } 
     } 
    } 

    val summaryMap = rows.aggregate(z) { seqOp, combOp } 
関連する問題