2017-11-13 18 views
0

スパークデータフレームをループする方法はありますか? は私がで構成されていたデータフレームを持っている:スパークデータフレームをループする方法

time, id, direction 
10, 4, True //here 4 enters --> (4,) 
20, 5, True //here 5 enters --> (4,5) 
34, 5, False //here 5 leaves --> (4,) 
67, 6, True //here 6 enters --> (4,6) 
78, 6, False //here 6 leaves --> (4,) 
99, 4, False //here 4 leaves -->() 

それは時間でソートされ、今、私はステップスルーし、有効なIDを蓄積したいと思います。 IDSは、その結果RDDこの

time, valid_ids 
(10, (4,)) 
(20, (4,5)) 
(34, (4,)) 
(67, (4,6)) 
(78, (4,) 
(99,()) 

のようになります。私は、これは並列化しないことを知っているが、DFがそれほど大きくない。==真と方向に出る==方向に虚偽

を入力します。では、スパーク/スカラでどうやってこのことができますか?

答えて

2

データが小さい場合(「でもdfはそれほど大きくない」)、Scalaコレクションを使用して収集して処理するだけです。以下に示すようなタイプがある場合:

df.printSchema 
root 
|-- time: integer (nullable = false) 
|-- id: integer (nullable = false) 
|-- direction: boolean (nullable = false) 

あなたが収集できます

val data = df.as[(Int, Int, Boolean)].collect.toSeq 

scanLeftvar

val result = data.scanLeft((-1, Set[Int]())){ 
    case ((_, acc), (time, value, true)) => (time, acc + value) 
    case ((_, acc), (time, value, false)) => (time, acc - value) 
}.tail 
1

使用はスカラ開発者のために推奨されるが、それでも私はされていません投稿の回答はvar

var collectArray = Array.empty[Int] 
df.rdd.collect().map(row => { 
    if(row(2).toString.equalsIgnoreCase("true")) collectArray = collectArray :+ row(1).asInstanceOf[Int] 
    else collectArray = collectArray.drop(1) 
    (row(0), collectArray.toList) 
}) 

これはあなたが結果与える必要があり

(10,List(4)) 
(20,List(4, 5)) 
(34,List(5)) 
(67,List(5, 6)) 
(78,List(6)) 
(99,List()) 
として
関連する問題