1

タイムスタンプ付きの位置データと、各位置に関連付けられた一連の文字列IDがあります。私は、ALA、一緒に現在のNと次のN行全体でこれらのフィーチャID列の全ての配列を引っ張ってスパークでウィンドウを使用したい:ウィンドウ内のすべての行にわたるユーザー定義関数

import sys 
from pyspark.sql.window import Window 
import pyspark.sql.functions as func 
windowSpec = Window \ 
    .partitionBy(df['userid']) \ 
    .orderBy(df['timestamp']) \ 
    .rowsBetween(-50, 50) 

dataFrame = sqlContext.table("locations") 
featureIds = featuresCollector(dataFrame['featureId']).over(windowSpec) 
dataFrame.select(
    dataFrame['product'], 
    dataFrame['category'], 
    dataFrame['revenue'], 
    featureIds.alias("allFeatureIds")) 

は、Sparkそうであれば、どのように可能、このですウィンドウ内のすべてのフィーチャIDを収集できるfeaturesCollectorのような関数を記述しますか?

答えて

1

スパークUDFを集約に使用することはできません。 Sparkは、カスタム集計に使用できるいくつかのツール(UserDefinedAggregateFunctionsAggregatorsAggregateExpressions)を提供しています。これらのツールのいくつかは、ウィンドウ処理で使用できますが、Pythonでは定義できません。

レコードを収集する場合は、collect_listのトリックが必要です。 非常に高価な操作であることに注意してください。

from pyspark.sql.functions import collect_list 

featureIds = collect_list('featureId').over(windowSpec) 
関連する問題