2017-08-09 9 views
0

内のトランザクションに基づいてイベントの順序をランク付けするように機能する、と言う:Iは、次のデータフレームを有するスパークデータフレーム

+----------+------------+------------+------------+---+ 
| day |inout_amount|prev_balance|post_balance|row| 
+----------+------------+------------+------------+---+ 
|2016-10-29|  -17000|  17000|   0| 1| 
|2016-10-30|  -17000|  17000|   0| 2| 
|2016-10-30|  5600|   0|  5600| 3| 
|2016-10-30|  5600|  5600|  11200| 4| 
|2016-10-30|  5800|  11200|  17000| 5| 
+----------+------------+------------+------------+---+ 

最初の行が「2016年10月29日」の正しいが4行下( "2016-10-30")がシャッフルされます。ここでは上記の表のコードです:

case class transaction(
day: String, 
inout_amount: Int, 
prev_balance: Int, 
post_balance: Int 
) 

val snippet = Seq(
transaction("2016-10-29", -17000, 17000, 0), 
transaction("2016-10-30", -17000, 17000, 0), 
transaction("2016-10-30", 5600, 0, 5600), 
transaction("2016-10-30", 5600, 5600, 11200), 
transaction("2016-10-30", 5800, 11200, 17000) 
) 

// below could be sparkContext if you working in zeppelin 
val df = sqlContext.createDataFrame(snippet) 

import org.apache.spark.sql.expressions.Window 

val window = Window.orderBy("day") 

df.withColumn("row", row_number().over(window)).show 

は、私は今、「prev_balanceは」前のトランザクションの「post_balance」に等しいという論理に基づいて、「2016年10月30日」の行をランク付けする必要があります。すなわち、目的のデータフレームは、次のようになります。

+----------+------------+------------+------------+---+-------+ 
|  day|inout_amount|prev_balance|post_balance|row|order-1| 
+----------+------------+------------+------------+---+-------+ 
|2016-10-29|  -17000|  17000|   0| 1|  0| 
|2016-10-30|  -17000|  17000|   0| 2|  4| 
|2016-10-30|  5600|   0|  5600| 3|  1| 
|2016-10-30|  5600|  5600|  11200| 4|  2| 
|2016-10-30|  5800|  11200|  17000| 5|  3| 
+----------+------------+------------+------------+---+-------+ 

を私は助けてください...スパークに新しいですし、私は「UDF」を作成し、「withColumn」とそれを適用する必要があることを推測します!

答えて

0

あなたはグループを作成し、日付を分類し、評価基準に従って値をソートし、結果をflatMapする必要があります。これはRDDで簡単に行うことができます。

+0

ミシェルはこれを試していますが、望ましい結果を得ることはできません。 –

関連する問題