2
Sparkのデータフレームdf
があり、時刻を表す列timestamp
があるとします。これは、UNIX形式の形式(1970年以降の秒数)です。私はSpark.Streamingをどのようにしてこれを入力として扱い、データをスライディングウィンドウで表示できるのですか? ありがとう!pyspark:時間列を持つデータフレームをスパークストリーミングオブジェクトに変換する方法は?
Sparkのデータフレームdf
があり、時刻を表す列timestamp
があるとします。これは、UNIX形式の形式(1970年以降の秒数)です。私はSpark.Streamingをどのようにしてこれを入力として扱い、データをスライディングウィンドウで表示できるのですか? ありがとう!pyspark:時間列を持つデータフレームをスパークストリーミングオブジェクトに変換する方法は?
少なくとも意味のある方法ではできません。それは可能であるが、このようなRDDからのストリームを作成するqueueStream
を使用する:キュー内のバッチとオブジェクトとの対応関係が1
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)
df = sc.parallelize([(i,) for i in range(10000)]).toDF(["ts"])
stream = ssc.queueStream([df.rdd])
stream.count().pprint()
ssc.start()
ssc.awaitTermination()
:1。残念ながら、queueStream
は、Scalaのそれとは異なり、静的ストリームです。新しいデータが作成された後にエンキューすることはできません。これは、DataFrame
を手動で複数のRDDに分割したことを意味します。
ありがとうございます@ Zero323。あなたがそれをXYと呼んだとき、私が望むことをする別の方法があると思います。しかし、これは明示的に私がしたいことです。私は実際のストリーミング入力がある前に(古い蓄積データを別の方法で処理する)、テストと開発のために古いデータをストリーミングジョブに「再生」できるようにしたい。 –
OK、コメントを削除:)個人的に私はむしろストリームにデータをプッシュする簡単なスクリプトを作成することを検討したいと思います。 – zero323
ありがとうございます@ zero323。あなたはそのような単純なスクリプトがどのように見えるかを教えていただけますか? –