2016-05-25 11 views
2

Sparkのデータフレームdfがあり、時刻を表す列timestampがあるとします。これは、UNIX形式の形式(1970年以降の秒数)です。私はSpark.Streamingをどのようにしてこれを入力として扱い、データをスライディングウィンドウで表示できるのですか? ありがとう!pyspark:時間列を持つデータフレームをスパークストリーミングオブジェクトに変換する方法は?

答えて

0

少なくとも意味のある方法ではできません。それは可能であるが、このようなRDDからのストリームを作成するqueueStreamを使用する:キュー内のバッチとオブジェクトとの対応関係が1

from pyspark.streaming import StreamingContext 

ssc = StreamingContext(sc, 10) 
df = sc.parallelize([(i,) for i in range(10000)]).toDF(["ts"]) 
stream = ssc.queueStream([df.rdd]) 
stream.count().pprint() 

ssc.start() 
ssc.awaitTermination() 

:1。残念ながら、queueStreamは、Scalaのそれとは異なり、静的ストリームです。新しいデータが作成された後にエンキューすることはできません。これは、DataFrameを手動で複数のRDDに分割したことを意味します。

+0

ありがとうございます@ Zero323。あなたがそれをXYと呼んだとき、私が望むことをする別の方法があると思います。しかし、これは明示的に私がしたいことです。私は実際のストリーミング入力がある前に(古い蓄積データを別の方法で処理する)、テストと開発のために古いデータをストリーミングジョブに「再生」できるようにしたい。 –

+0

OK、コメントを削除:)個人的に私はむしろストリームにデータをプッシュする簡単なスクリプトを作成することを検討したいと思います。 – zero323

+0

ありがとうございます@ zero323。あなたはそのような単純なスクリプトがどのように見えるかを教えていただけますか? –

関連する問題