ような何かを行うことができます。
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag
df = sc.parallelize([
(1473678846, 2), (1473678852, 1),
(1473679029, 3), (1473679039, 3),
(1473679045, 2), (1473679055, 1)
]).toDF(["time", "door_status"])
w = Window().orderBy("time")
(df
.withColumn("prev_status", lag("door_status", 1).over(w))
.where(col("door_status") != col("prev_status"))
.groupBy("door_status", "prev_status")
.count())
これは縮尺通りではありません。 mapParitions
を試すことができます。私たちは、このように操作を行うことができ、これらの2つと
def merge(x, y):
"""Given a pair of tuples:
(first-state, last-state, counter_of changes)
return a tuple of the same shape representing aggregated results
>>> merge((None, None, Counter()), (1, 1, Counter()))
(None, 1, Counter())
>>> merge((1, 2, Counter([(1, 2)])), (2, 2, Counter()))
(None, 2, Counter({(1, 2): 1}))
>>> merge((1, 2, Counter([(1, 2)])), (3, 2, Counter([(3, 2)])
(None, 2, Counter({(1, 2): 1, (2, 3): 1, (3, 2): 1}))
"""
(_, last_x, cnt_x), (first_y, last_y, cnt_y) = x, y
# If previous partition wasn't empty update counter
if last_x is not None and first_y is not None and last_x != first_y:
cnt_y[(last_x, first_y)] += 1
# Merge counters
cnt_y.update(cnt_x)
return (None, last_y, cnt_y)
:
partials = (df.rdd
.mapPartitions(process_partition)
.collect())
reduce(merge, [(None, None, Counter())] + partials)
@ zero323ありがとうございます。最初の方法が働いた。あなたが述べたように、大量のデータでは遅く見えます。私はまだ2番目の方法を試しています。 – Sisyphus
@DavidArenburg私が従うかわからない。あなたは詳しく説明できますか? – zero323
@DavidArenburgいいえ、もちろんです。私は、異なるパーティションレイアウトでテストスイートを実行するために使用しました。ありがとうございました! – zero323