2017-06-12 7 views
1

私の要件は、株式市場の1時間単位のデータを処理することです。 つまり、ストリーミング間隔ごとに1回データをソースから取得し、DStream経由で処理します。Sparkストリーミング受信機の周波数を設定する方法は?

私は、onStart()メソッドとonStop()メソッドとその動作を実装することによって、ウェブサイトをスクラップ/監視するカスタムレシーバを実装しました。

課題が発生しました:

  • レシーバのスレッドが連続してすなわちデータをフェッチされ、倍数の時間を間隔ごと。
  • 受信機とDStreamの実行時間間隔を調整できません。

オプションは、私が試した:

  1. レシーバスレッドは(間隔をストリーミングに等しい)数秒間スリープ状態に。 この場合、データは処理中の最新のデータではありません。 DSTREAM処理と同期してスパークストリーミング受信機を作るためにどのよう

enter image description here

enter image description here

class CustomReceiver(interval: Int) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { 

    def onStart() { 
    new Thread("Website Scrapper") { 
     override def run() { receive() } 
    }.start() 
    } 

    def onStop() { 

    } 

    /** Create a socket connection and receive data until receiver is stopped */ 
    private def receive() { 
    println("Entering receive:" + new Date()); 
    try { 
     while (!isStopped) { 
     val scriptsLTP = StockMarket.getLiveStockData() 
     for ((script, ltp) <- scriptsLTP) { 
      store(script + "," + ltp) 
     } 
     println("sent data") 
     System.out.println("going to sleep:" + new Date()); 
     Thread.sleep(3600 * 1000); 
     System.out.println("awaken from sleep:" + new Date()); 
     } 
     println("Stopped receiving") 
     restart("Trying to connect again") 
    } catch { 
     case t: Throwable => 
     restart("Error receiving data", t) 
    } 
    println("Exiting receive:" + new Date()); 
    } 
} 

+0

ストリーミング間隔の開始時にデータを取得するのはオプションですか?あなたのビューごとに – maasg

答えて

0

このユースケースは、Spark Streamingには適していないようです。この間隔は、これを通常のバッチジョブと見なすには十分です。こうすることで、クラスタリソースをより有効に活用できます。

mapPartitionsを使用して、分散型Webスクラッパーとしてエグゼキュータを使用し、意図したとおりに処理することで、ターゲットティッカーを並列化してスパークジョブとして書き直します。

次に、正確な時刻にクロノスなどの先進的な代替方法を使用して、0時間ごとに実行するようにSparkジョブをスケジュールします。

+0

、ストリーミングジョブの可能な最大間隔は何ですか? –

+0

@VijayInnamuriはユースケースに依存しているので、数字を入れるのは大切なことではありません。あなたが処理しなければならないデータが絶えず来ていて、1時間に1回のレポートしか必要としないなら、私はSpark Streamingを検討します。ここでのポイントは、Spark Streamingは**ストリーミング**データであり、あなたのユースケースは特定の時間に実行する必要があるより多くのバッチジョブに似ています。また、Spark Streamingは実行時にクラスタリソースを割り当てます。あなたのユースケースは、1時間ごとに数分の処理に似ています。そのような低い使用量のリソースの良い塊を常にブロックすることは有害でしょう。 – maasg

+0

清潔さをありがとう。 –

関連する問題