私の要件は、株式市場の1時間単位のデータを処理することです。 つまり、ストリーミング間隔ごとに1回データをソースから取得し、DStream経由で処理します。Sparkストリーミング受信機の周波数を設定する方法は?
私は、onStart()メソッドとonStop()メソッドとその動作を実装することによって、ウェブサイトをスクラップ/監視するカスタムレシーバを実装しました。
課題が発生しました:
- レシーバのスレッドが連続してすなわちデータをフェッチされ、倍数の時間を間隔ごと。
- 受信機とDStreamの実行時間間隔を調整できません。
オプションは、私が試した:
- レシーバスレッドは(間隔をストリーミングに等しい)数秒間スリープ状態に。 この場合、データは処理中の最新のデータではありません。 DSTREAM処理と同期してスパークストリーミング受信機を作るためにどのよう
class CustomReceiver(interval: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
new Thread("Website Scrapper") {
override def run() { receive() }
}.start()
}
def onStop() {
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
println("Entering receive:" + new Date());
try {
while (!isStopped) {
val scriptsLTP = StockMarket.getLiveStockData()
for ((script, ltp) <- scriptsLTP) {
store(script + "," + ltp)
}
println("sent data")
System.out.println("going to sleep:" + new Date());
Thread.sleep(3600 * 1000);
System.out.println("awaken from sleep:" + new Date());
}
println("Stopped receiving")
restart("Trying to connect again")
} catch {
case t: Throwable =>
restart("Error receiving data", t)
}
println("Exiting receive:" + new Date());
}
}
?
ストリーミング間隔の開始時にデータを取得するのはオプションですか?あなたのビューごとに – maasg