2017-12-21 16 views
1

Iterable [MyObject](groupByの前にRDD [MyObject]でした)に含まれるデータを "抽出"する必要があります。Scala RDD count by range

私の最初のRDD [MyObjectに]:

|-----------|---------|----------| 
| startCity | endCity | Customer | 
|-----------|---------|----------| 
| Paris  | London | ID | Age | 
|   |   |----|-----| 
|   |   | 1 | 1 | 
|   |   |----|-----| 
|   |   | 2 | 1 | 
|   |   |----|-----| 
|   |   | 3 | 50 | 
|-----------|---------|----------| 
| Paris  | London | ID | Age | 
|   |   |----|-----| 
|   |   | 5 | 40 | 
|   |   |----|-----| 
|   |   | 6 | 41 | 
|   |   |----|-----| 
|   |   | 7 | 2 | 
|-----------|---------|----|-----| 
| New-York | Paris | ID | Age | 
|   |   |----|-----| 
|   |   | 9 | 15 | 
|   |   |----|-----| 
|   |   | 10| 16 | 
|   |   |----|-----| 
|   |   | 11| 46 | 
|-----------|---------|----|-----| 
| New-York | Paris | ID | Age | 
|   |   |----|-----| 
|   |   | 13| 7 | 
|   |   |----|-----| 
|   |   | 14| 9 | 
|   |   |----|-----| 
|   |   | 15| 60 | 
|-----------|---------|----|-----| 
| Barcelona | London | ID | Age | 
|   |   |----|-----| 
|   |   | 17| 66 | 
|   |   |----|-----| 
|   |   | 18| 53 | 
|   |   |----|-----| 
|   |   | 19| 11 | 
|-----------|---------|----|-----| 

私はとGROUPBY startCityによって年齢の範囲でそれらをカウントする必要がある - endCity

は、最終的な結果は次のようになります。で

|-----------|---------|-------------| 
| startCity | endCity | Customer | 
|-----------|---------|-------------| 
| Paris  | London | Range| Count| 
|   |   |------|------| 
|   |   |0-2 | 3 | 
|   |   |------|------| 
|   |   |3-18 | 0 | 
|   |   |------|------| 
|   |   |19-99 | 3 | 
|-----------|---------|-------------| 
| New-York | Paris | Range| Count| 
|   |   |------|------| 
|   |   |0-2 | 0 | 
|   |   |------|------| 
|   |   |3-18 | 3 | 
|   |   |------|------| 
|   |   |19-99 | 2 | 
|-----------|---------|-------------| 
| Barcelona | London | Range| Count| 
|   |   |------|------| 
|   |   |0-2 | 0 | 
|   |   |------|------| 
|   |   |3-18 | 1 | 
|   |   |------|------| 
|   |   |19-99 | 2 | 
|-----------|---------|-------------| 

私はこれを3回、同じデータ(最初は0-2の範囲、次に10-20、そして21-99)とカウントすることでこれをやっています。

同様:

Iterable[MyObject] ite 

ite.count(x => x.age match { 
    case Some(age) => { age >= 0 && age < 2 } 
} 

それは私の整数を与えることによって働いているが、すべてではない、私は何度もカウントする必要があるため、私は考えて効率的に、これをしてください行うための最善の方法は何ですか?

おかげ

EDIT:あなたがする必要がある場合は :Customerオブジェクトがcase class MyObject(id : String, age : Int)

rdd 
    .map(x=> computeRange(x.age) -> 1) 
    .reduceByKey(_+_) 

編集のRDDと、その後

+0

は 'Customer'は配列ですか? –

+0

お客様はオブジェクトです – Drakax

+0

私たちがお手伝いできるようにオブジェクトタイプを共有できますか?それは事件の階級ですか? –

答えて

2
def computeRange(age : Int) = 
    if(age<=2) 
     "0-2" 
    else if(age<=10) 
     "2-10" 
    // etc, you get the idea 

ケースクラスですいくつかの列でグループ化すると、このようにすることができますあなたはRDD [(SomeColumns、Iterable [MyObject])]を持っています。次の行は、各「範囲」をその発生数に関連付けるマップを提供します。

def computeMapOfOccurances(list : Iterable[MyObject]) : Map[String, Int] = 
    list 
     .map(_.age) 
     .map(computeRange) 
     .groupBy(x=>x) 
     .mapValues(_.size) 

val result1 = rdd 
    .mapValues(computeMapOfOccurances(_)) 

そして、あなたはあなたのデータをフラット化する必要がある場合、あなたは書くことができます。

val result2 = result1 
    .flatMapValues(_.toSeq)  
+0

RDD [MyObject]の魅力のように働いていますが、私は初期のRDD [MyObject]を使ってIedable [MyObject] Iterableで。最初の質問は私の心の中ではっきりしていなかったので編集されました。申し訳ありません – Drakax

+0

私は私の答えも編集しました。希望はこれが助ける – Oli

+0

それは働いている、ありがとう! – Drakax

1

あなたはCustomer[Object]case class

case class Customer(ID: Int, Age: Int) 

そして、あなたのRDD[MyObject]次のようにrddあるとして持っていると仮定すると、 case classの数値は

case class MyObject(startCity: String, endCity: String, customer: List[Customer]) 
ですから、

MyObject(Paris,London,List(Customer(1,1), Customer(2,1), Customer(3,50))) 
MyObject(Paris,London,List(Customer(5,40), Customer(6,41), Customer(7,2))) 
MyObject(New-York,Paris,List(Customer(9,15), Customer(10,16), Customer(11,46))) 
MyObject(New-York,Paris,List(Customer(13,7), Customer(14,9), Customer(15,60))) 
MyObject(Barcelona,London,List(Customer(17,66), Customer(18,53), Customer(19,11))) 

以下のように入力します(表形式で持っていること)を有するべきであるそして、あなたはまた、あなたがIterable[MyObject]を持ってグループ化した後、ステップ以下と同等であることを言及したcase class ES上で使用して

val groupedRDD = rdd.groupBy(myobject => (myobject.startCity, myobject.endCity)) //groupedRDD: org.apache.spark.rdd.RDD[((String, String), Iterable[MyObject])] = ShuffledRDD[2] at groupBy at worksheetTest.sc:23 

だから、あなたがするために次のステップはIterable[MyObject]を反復処理するmapValuesを使用し、各範囲に属するage秒を数えることである、とfinall

​​が 再帰関数

def updateCounts(ageList: List[Int], map: Map[String, Int]) : Map[String, Int] = ageList match{ 
    case head :: tail => if(head >= 0 && head < 3) { 
    updateCounts(tail, map ++ Map("0-2" -> (map("0-2")+1))) 
    } else if(head >= 3 && head < 19) { 
    updateCounts(tail, map ++ Map("3-18" -> (map("3-18")+1))) 
    } else updateCounts(tail, map ++ Map("19-99" -> (map("19-99")+1))) 
    case Nil => map 
} 

CustomerOutそうfinalResultは以下のように

case class

case class CustomerOut(Range: String, Count: Int) 

ある

val finalResult = groupedRDD.mapValues(x => { 
    val rangeAge = Map("0-2" -> 0, "3-18" -> 0, "19-99" -> 0) 
    val list = x.flatMap(y => y.customer.map(z => z.Age)).toList 
    updateCounts(list, rangeAge).map(x => CustomerOut(x._1, x._2)).toList 
}) 

以下のように必要な出力に変換するY

((Barcelona,London),List(CustomerOut(0-2,0), CustomerOut(3-18,1), CustomerOut(19-99,2))) 
((New-York,Paris),List(CustomerOut(0-2,0), CustomerOut(3-18,4), CustomerOut(19-99,2))) 
((Paris,London),List(CustomerOut(0-2,3), CustomerOut(3-18,0), CustomerOut(19-99,3)))