2016-09-08 2 views
0

私はRDDに次のエントリを持っています。グループRDDエントリ

(111,List(List(1473163148,abc))) 
(111,List(List(1473163143,def))) 
(111,List(List(1473163143,abd))) 
(111,List(List(1473163139,asd))) 
(111,List(List(1473163696,rtf))) 
(111,List(List(1473163700,rgd))) 
(111,List(List(1473163703,dmf)))  

それぞれの新しいエントリはリストが含まれますように、私は、新しいエントリにこれらのエントリのグループにしたいです30分以内に古いエントリを削除します。それは簡単なようですが、練習では私のコードはトリックをしません。

1473163143 1473163143 1473163148  
1473163139    
1473163696 1473163700 1473163703  
1473168932 

これらのタイムスタンプは秒であるため、彼らが必要:

val grouped = processed.reduceByKey((x,y) => x ++ y) 
val separated = grouped.flatMap { case (k, l) => MyFuncObj.createGroups(l).map(sublist => (k, sublist)) } 

object MyFuncObj { 

    def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = { 
    l.groupBy(_.productElement(0).toString.toLong/30*60).values 
    } 

} 

は、上記のデータにこのコードを適用した後、私は以下の結果(これは重要なポイントであるので、私は唯一のタイムスタンプを提供)を取得します

1473163143 1473163143 1473163148 1473163139 1473163696 1473163700 1473163703 
1473168932 

このタスクを解決するにはどうすればよいですか?

UPDATE:

をより明確にする:私は最初のレコードの時から始まる30分間のバケツを得ることを期待。

+0

質問はまだ不明である:期待される出力は、入力に表示されていないタイムスタンプを '1473168932'含まれています。 –

答えて

0

二つの問題がここにあります:あなたが最初のエントリの時に起動する「バケット」をしたい

  1. 場合 - あなたはあなたの前に、各タイムスタンプとその最初のタイムスタンプの間デルタを使用する必要があります

    0123:あなたは60で、その結果を乗算その後、30とで除している、代わりに(30*60)で割るの - 部門30*60の周りに括弧がありません

  2. を作ります

    scala> 5000/30*60 
    res0: Int = 9960 
    
    scala> 5000/(30*60) 
    res1: Int = 2 
    

全体で - これはあなたが必要なものをやっているようだ:

// sample data: 
val processed = sc.parallelize(List(
    (111,List(List(1473163148L, "abc"))), 
    (111,List(List(1473163143L,"def"))), 
    (111,List(List(1473163143L,"abd"))), 
    (111,List(List(1473163139L,"asd"))), 
    (111,List(List(1473163696L,"rtf"))), 
    (111,List(List(1473163700L,"rgd"))), 
    (111,List(List(1473168932L,"dmf")))) 
) 


// first - find the lowest timsestamp: 
// if input isn't ordered: 
val firstTimestamp: Long = processed.values.map { case List((l: Long) :: _) => l }.min() 

// if input is sorted by timestamp: 
val firstTimestamp: Long = processed.first()._2.head.head.toString.toLong 

def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = { 
    // divide the DELTA between each timestamp and first one by 30 minutes to find bucket: 
    l.groupBy(t => (firstTimestamp - t.productElement(0).toString.toLong)/(30*60)).values 
} 

// continue as you did: 
val grouped: RDD[(Int, List[List[Any]])] = processed.reduceByKey((x, y) => x ++ y) 
val separated: RDD[(Int, List[List[Any]])] = grouped.flatMap { 
    case (k, l) => createGroups(l).map(sublist => (k, sublist)) 
} 

separated.foreach(println) 
// prints: 
// (111,List(List(1473168932, dmf))) 
// (111,List(List(1473163148, abc), List(1473163143, def), List(1473163143, abd), List(1473163139, asd), List(1473163696, rtf), List(1473163700, rgd))) 
+0

ありがとうございます。唯一の問題は 'val firstTimestamp:Long = processed.values.map {case List((l:Long):: _)=> l} .min()'の行にある。 'Scruntineeはパターンタイプと互換性がありません、見つかった:Long、required:String'というエラーが表示されます。 – Lobsterrrr

+0

私は、タイムスタンプが入力データに 'Long'型であると仮定しました。あなたの質問は型を定義していませんでした。もしそれらがStringであれば、その行の型をStringに変更し、' l.toLong'を呼び出します。 –

+0

'val firstTimestamp:Long = processed.values.map {case List((l:String):: _)=> l.toLong} .min()'。これは私に 'java.lang.UnsupportedOperationException:empty collection'エラーを与える – Lobsterrrr

関連する問題