以下は私の火花データフレームであるpyspark動的な列の計算
a b c
1 3 4
2 0 0
4 1 0
2 2 0
式が
prev(c)-b+a
すなわち、
4-2+0=2
と
2-4+1=-1
は誰も私を助けてくださいすることができているよう
a b c
1 3 4
2 0 2
4 1 -1
2 2 3
の下
私の出力は次のようになりますこの障害を乗り越えるには?
以下は私の火花データフレームであるpyspark動的な列の計算
a b c
1 3 4
2 0 0
4 1 0
2 2 0
式が
prev(c)-b+a
すなわち、
4-2+0=2
と
2-4+1=-1
は誰も私を助けてくださいすることができているよう
a b c
1 3 4
2 0 2
4 1 -1
2 2 3
の下
私の出力は次のようになりますこの障害を乗り越えるには?
from pyspark.sql.functions import lag, udf
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
numbers = [[1,2,3],[2,3,4],[3,4,5],[5,6,7]]
df = sc.parallelize(numbers).toDF(['a','b','c'])
df.show()
w = Window().partitionBy().orderBy('a')
calculate = udf(lambda a,b,c:a-b+c,IntegerType())
df = df.withColumn('result', lag("a").over(w)-df.b+df.c)
df.show()
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
| 2| 3| 4|
| 3| 4| 5|
| 5| 6| 7|
+---+---+---+
+---+---+---+------+
| a| b| c|result|
+---+---+---+------+
| 1| 2| 3| null|
| 2| 3| 4| 2|
| 3| 4| 5| 3|
| 5| 6| 7| 4|
+---+---+---+------+
お返事ありがとうございました。しかし、私のデータフレームがこのような数であれば、これは機能しません。[= 1,1,2,3]、[1,1,2,3]、[2,2,3,4]、[3,3,4 、[5]、[3,3,4,5]、[3,3,4,5]、[4,5,6,7]] df = sc.parallelize(numbers).toDF(['cat' ( 'cat') df = df.withColumn( 'result'、lag( "a"))。 (w)-df.b + df.c)。最後に別のものを取るべきですか? –
何のエラーがありますか? – StackPointer
それはちょうど良い私の前にあなたのデータセットを与えました – StackPointer
これは役に立ちます。
import pyspark.sql.functions as f
from pyspark.sql.window import Window
df = sc.parallelize([
[1,3],
[2,0],
[4,1],
[2,2]
]).toDF(('a', 'b'))
df1 = df.withColumn("row_id", f.monotonically_increasing_id())
w = Window.partitionBy().orderBy(f.col("row_id"))
df1 = df1.withColumn("c_temp", f.when(f.col("row_id")==0, f.lit(4)).otherwise(- f.col("a") + f.col("b")))
df1 = df1.withColumn("c", f.sum(f.col("c_temp")).over(w)).drop("c_temp","row_id")
df1.show()
出力は次のとおりです。
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 3| 4|
| 2| 0| 2|
| 4| 1| -1|
| 2| 2| -1|
+---+---+---+
あなたがすでにこれを解決するために何かをしようとしたことを想定しています。あなたがしたことを教えてください。 – Grigoriy
あなたの問題は何ですか?以前の値を取得する方法とフィールドを合計する方法を簡単に検索することができます –
はい、私は以下のアプローチを使用しました アプローチが作成された新しい列c_newが1だけ遅れてc_new-a + bが後で解析されました以前に生成されたc_newの値から動的に取得する必要があります。 以下のコード DF = df.withColumn( 'c_new'、func.lag(DF [ 'C'])。上(Window.partitionBy( "A")。ORDERBY( "A"))) あります df = df.withColumn( 'Stock_New'、(df ['c' new]) - stock_output_table ['a']) 値を動的に取得する方法がわからないc_new –