1

私は基本的に私のドライバプログラムにイベントコールバックを書いて、そのイベントの到着時にsparkストリーミングアプリケーションを再起動したいと思っています。 私のドライバプログラムはファイルから構成を読み込んでストリームと実行ロジックを設定しています。 ファイルが変更されるたびに(新しいコンフィグが追加された)ドライバプログラムがシーケンスの次のステップを行う必要があり、スパークストリーミングアプリケーションを再起動する最善の方法は何ですか?

  1. を再起動し、
  2. は(mainメソッドの一部として)設定ファイルを読み込み、
  3. ストリームを設定する

これを達成する最も良い方法は何ですか?

答えて

0

次のように私は現在

  • がMQTTコールバックではMQTTトピックに

  • をサブスクライブすることにより、外部イベントを聞いて、この問題を解決しています、ストリーミングコンテキストssc.stop(true,true)を停止します正常にシャットダウンストリームとその基盤 スパーク設定

  • spark confを作成して再度起動し、 設定するP設定ファイルを読み込むことにより、ストリームは

// Contents of startSparkApplication() method 
sparkConf = new SparkConf().setAppName("SparkAppName") 
ssc = new StreamingContext(sparkConf, Seconds(1)) 
val myStream = MQTTUtils.createStream(ssc,...) //provide other options 
myStream.print() 
ssc.start() 
アプリケーションは春ブートアプリケーションとして構築されてい

0

Sparkを再起動する最善の方法は実際にはお使いの環境によって異なりますが、​​コンソールを使用することをお勧めします。

他のlinuxプロセスのように、​​プロセスをバックグラウンドにするには、shellのバックグラウンドに配置します。あなたの場合、​​ジョブは実際にはYARNでドライバを実行します。そのため、別のマシンで既にYARNを介して非同期に実行されているプロセスをベビーシッターしています。我々は(ここではスパークミートで)最近探求

Cloudera blog

0

一つの方法は、Sparkと並行して飼育係を使用することによって、これを達成することでした。一言で言えば、Apache Curatorを使用してZookeeperの変更(ZKの設定変更が外部イベントによって引き起こされる可能性がある)を監視し、リスナーを再起動させます。

参照コードベースがhereである場合、正常なシャットダウンと再読み込みの変更後に、設定が変更されるとWatcher(スパークストリーミングアプリ)が再起動します。これがポインタであることを願っています!

1

は、いくつかのケースでは、例えば(動的コンテキストストリーミングをリロードすることもできますストリーミング操作のリロード)。その場合には あなたは、(スカラ座例):

val sparkContext = new SparkContext() 

val stopEvent = false 
var streamingContext = Option.empty[StreamingContext] 
val shouldReload = false 

val processThread = new Thread { 
    override def run(): Unit = { 
    while (!stopEvent){ 
     if (streamingContext.isEmpty) { 

     // new context 
     streamingContext = Option(new StreamingContext(sparkContext, Seconds(1))) 

     // create DStreams 
      val lines = streamingContext.socketTextStream(...) 

     // your transformations and actions 
     // and decision to reload streaming context 
     // ... 

     streamingContext.get.start() 
     } else { 
     if (shouldReload) { 
      streamingContext.get.stop(stopSparkContext = false, stopGracefully = true) 
      streamingContext.get.awaitTermination() 
      streamingContext = Option.empty[StreamingContext] 
     } else { 
      Thread.sleep(1000) 
     } 
     } 

    } 
    streamingContext.get.stop(stopSparkContext =true, stopGracefully = true) 
    streamingContext.get.awaitTermination() 
    } 
} 

// and start it in separate thread 
processThread.start() 
processThread.join() 

またはのpython 中:

spark_context = SparkContext() 

stop_event = Event() 
spark_streaming_context = None 
should_reload = False 

def process(self): 
    while not stop_event.is_set(): 
     if spark_streaming_context is None: 

      # new context 
      spark_streaming_context = StreamingContext(spark_context, 0.5) 

      # create DStreams 
      lines = spark_streaming_context.socketTextStream(...) 

      # your transformations and actions 
      # and decision to reload streaming context 
      # ... 

      self.spark_streaming_context.start() 
     else: 
      # TODO move to config 
      if should_reload: 
       spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True) 
       spark_streaming_context.awaitTermination() 
       spark_streaming_context = None 
      else: 
       time.sleep(1) 
    else: 
     self.spark_streaming_context.stop(stopGraceFully=True) 
     self.spark_streaming_context.awaitTermination() 


# and start it in separate thread 
process_thread = threading.Thread(target=process) 
process_thread.start() 
process_thread.join() 

あなたはクラッシュから、あなたのコードを防ぎ、ストリーミングコンテキストを再起動する場合は最後の場所からcheckpointingメカニズムを使用してください。 失敗後にジョブの状態を復元することができます。

関連する問題