2017-09-19 11 views
0

以下は私の火花データフレームであるpyspark動的な列の計算

a b c 
1 3 4 
2 0 0 
4 1 0 
2 2 0 
式が prev(c)-b+aすなわち、 4-2+0=22-4+1=-1

は誰も私を助けてくださいすることができているよう

a b c 
1 3 4 
2 0 2 
4 1 -1 
2 2 3 

の下

私の出力は次のようになりますこの障害を乗り越えるには?

+0

あなたがすでにこれを解決するために何かをしようとしたことを想定しています。あなたがしたことを教えてください。 – Grigoriy

+0

あなたの問題は何ですか?以前の値を取得する方法とフィールドを合計する方法を簡単に検索することができます –

+0

はい、私は以下のアプローチを使用しました アプローチが作成された新しい列c_newが1だけ遅れてc_new-a + bが後で解析されました以前に生成されたc_newの値から動的に取得する必要があります。 以下のコード DF = df.withColumn( 'c_new'、func.lag(DF [ 'C'])。上(Window.partitionBy( "A")。ORDERBY( "A"))) あります df = df.withColumn( 'Stock_New'、(df ['c' new]) - stock_output_table ['a']) 値を動的に取得する方法がわからないc_new –

答えて

2
from pyspark.sql.functions import lag, udf 
from pyspark.sql.types import IntegerType 
from pyspark.sql.window import Window 

numbers = [[1,2,3],[2,3,4],[3,4,5],[5,6,7]] 
df = sc.parallelize(numbers).toDF(['a','b','c']) 
df.show() 

w = Window().partitionBy().orderBy('a') 
calculate = udf(lambda a,b,c:a-b+c,IntegerType()) 
df = df.withColumn('result', lag("a").over(w)-df.b+df.c) 
df.show() 



+---+---+---+ 
| a| b| c| 
+---+---+---+ 
| 1| 2| 3| 
| 2| 3| 4| 
| 3| 4| 5| 
| 5| 6| 7| 
+---+---+---+ 

+---+---+---+------+ 
| a| b| c|result| 
+---+---+---+------+ 
| 1| 2| 3| null| 
| 2| 3| 4|  2| 
| 3| 4| 5|  3| 
| 5| 6| 7|  4| 
+---+---+---+------+ 
+0

お返事ありがとうございました。しかし、私のデータフレームがこのような数であれば、これは機能しません。[= 1,1,2,3]、[1,1,2,3]、[2,2,3,4]、[3,3,4 、[5]、[3,3,4,5]、[3,3,4,5]、[4,5,6,7]] df = sc.parallelize(numbers).toDF(['cat' ( 'cat') df = df.withColumn( 'result'、lag( "a"))。 (w)-df.b + df.c)。最後に別のものを取るべきですか? –

+0

何のエラーがありますか? – StackPointer

+0

それはちょうど良い私の前にあなたのデータセットを与えました – StackPointer

0

これは役に立ちます。

import pyspark.sql.functions as f 
from pyspark.sql.window import Window 

df = sc.parallelize([ 
    [1,3], 
    [2,0], 
    [4,1], 
    [2,2] 
]).toDF(('a', 'b')) 

df1 = df.withColumn("row_id", f.monotonically_increasing_id()) 
w = Window.partitionBy().orderBy(f.col("row_id")) 
df1 = df1.withColumn("c_temp", f.when(f.col("row_id")==0, f.lit(4)).otherwise(- f.col("a") + f.col("b"))) 
df1 = df1.withColumn("c", f.sum(f.col("c_temp")).over(w)).drop("c_temp","row_id") 
df1.show() 

出力は次のとおりです。

+---+---+---+ 
| a| b| c| 
+---+---+---+ 
| 1| 3| 4| 
| 2| 0| 2| 
| 4| 1| -1| 
| 2| 2| -1| 
+---+---+---+ 
関連する問題