私はRDDに次のエントリを持っています。グループRDDエントリ
(111,List(List(1473163148,abc)))
(111,List(List(1473163143,def)))
(111,List(List(1473163143,abd)))
(111,List(List(1473163139,asd)))
(111,List(List(1473163696,rtf)))
(111,List(List(1473163700,rgd)))
(111,List(List(1473163703,dmf)))
それぞれの新しいエントリはリストが含まれますように、私は、新しいエントリにこれらのエントリのグループにしたいです30分以内に古いエントリを削除します。それは簡単なようですが、練習では私のコードはトリックをしません。
1473163143 1473163143 1473163148
1473163139
1473163696 1473163700 1473163703
1473168932
これらのタイムスタンプは秒であるため、彼らが必要:
val grouped = processed.reduceByKey((x,y) => x ++ y)
val separated = grouped.flatMap { case (k, l) => MyFuncObj.createGroups(l).map(sublist => (k, sublist)) }
object MyFuncObj {
def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
l.groupBy(_.productElement(0).toString.toLong/30*60).values
}
}
は、上記のデータにこのコードを適用した後、私は以下の結果(これは重要なポイントであるので、私は唯一のタイムスタンプを提供)を取得します
1473163143 1473163143 1473163148 1473163139 1473163696 1473163700 1473163703
1473168932
このタスクを解決するにはどうすればよいですか?
UPDATE:
をより明確にする:私は最初のレコードの時から始まる30分間のバケツを得ることを期待。
質問はまだ不明である:期待される出力は、入力に表示されていないタイムスタンプを '1473168932'含まれています。 –