2016-05-31 16 views
0

提供されたhttpリンクからストリーミングされたデータを読み取るカスタムスパーク受信機を使用しています。提供されたhttpリンクが正しくない場合、受信側は失敗します。問題は、スパークが受信機を連続的に再起動し、アプリケーションが決して終了しないということです。問題は、受信機に障害が発生した場合にアプリケーションを終了するようにSparkに指示する方法です。スパークストリーミング:受信者の障害後に受信者を再起動しない方法

これが私たちのカスタム受信機の抽出物である:

def onStart() { 
    // Start the thread that receives data over a connection 
    new Thread("Receiver") { 
     override def run() { receive() } 
    }.start() 
    } 

    private def receive(): Unit = { 
    .... 
    val response: CloseableHttpResponse = httpclient.execute(req) 
    try { 
     val sl = response.getStatusLine() 
     if (sl.getStatusCode != 200){ 
     val errorMsg = "Error: " + sl.getStatusCode 
     val thrw = new RuntimeException(errorMsg) 
     stop(errorMsg, thrw) 
     } else { 
     ... 
     store(doc) 
     } 

私たちは、この受信機を使用するアプリケーションをストリーミング火花を持っている:予想通り、受信機がエラーを持っていない場合

val ssc = new StreamingContext(sparkConf, duration) 
val changes = ssc.receiverStream(new CustomReceiver(... 
... 
ssc.start() 
ssc.awaitTermination() 

すべてが動作します。受信機が失敗した場合(例えば、間違ったhttpリンクを使用した場合)、sparkは継続的に再起動し、アプリケーションは決して終了しません。

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it. 

受信者が失敗した場合、アプリケーション全体を終了したいだけです。

+0

悲しいことに、現在非難されています – Jake

答えて

0

SparkストリーミングでのスケジューリングがReceiverTracker自体を停止させないまでReceiverTrackerが失敗した受信機を再起動し続けるように動作するようです。

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L618

ReceiverTrackerを停止するには、我々は、アプリケーション全体を停止する必要があります。したがって、受信者自身からこのプロセスを制御する方法はないようです。

2

カスタム受信機ベースの火花ストリーミングアプリケーションのライフサイクルを制御する方法があります。アプリケーションのジョブ進捗リスナーを定義し、何が起きているかを把握します。

class CustomReceiverListener extends StreamingJobProgressListener { 
    private boolean receiverStopped = false; 

    public CustomReceiverListener(StreamingContext ssc) { super(ssc);} 

    public boolean isReceiverStopped() { 
     return receiverStopped; 
    } 
    @Override 
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) { 
     LOG.info("Update the flag field"); 
     this.receiverStopped = true; 
    } 
} 

ドライバでは、スレッドを初期化して、receiverStoppedフラグの状態を監視します。このスレッドが終了すると、ドライバはストリームアプリを停止します。 (よりよいアプローチは、ストリーミングアプリケーションを停止するドライバによって定義されたコールバックメソッドを定義することです)。

CustomReceiverListener listener = new CustomReceiverListener(ssc); 
ssc.addStreamingListener(listener); 
ssc.start(); 
Thread thread = new Thread(() -> { 
    while (!listener.isReceiverStopped()) { 
     LOG.info("Sleepy head..."); 
     try { 
      Thread.sleep(2 * 1000); /*check after 2 seconds*/ 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
}); 
thread.start(); 
thread.join(); 
LOG.info("Listener asked to die! Going to commit suicide :("); 
ssc.stop(true, false); 

注:はあなたの受信機の複数のインスタンスの場合は、すべての受信インスタンスが停止していることを確認するCustomReceiverListenerの実装を変更。

+0

私はこれが推奨されていません – Jake

1

私はどこでもJVM内から自由にカスタム受信機を起動、停止する方法を発見しました。プロトタイプですが、ストレステストはしていません。レシーバスーパーバイザなどの階層はシャットダウンされませんが、カスタム受信者が再起動されるまでは本質的にアイドルになります。

シングルトンHashMapを保持するオブジェクトを作成スパークAPI

  • の意図を尊重しているようです。カスタムレシーバと希望のステータス(有効または無効)をマップに保持します。

;;;

case class IKodaTextSocketReceiverStatus(receiver:IKodaTextSocketReceiver,enabled:Boolean) 
{ 

} 

object IKodaTextSocketReceiver extends Logging 
{ 
    val receiverMap:mutable.HashMap[String,IKodaTextSocketReceiverStatus]= mutable.HashMap[String,IKodaTextSocketReceiverStatus]() 


    def restartReceiver(recId:String):Boolean= 
    { 
    if(receiverMap.get(recId).isDefined) 
    { 
     logger.info("Found existing receiver") 

     receiverMap.put(recId,new IKodaTextSocketReceiverStatus(receiverMap.get(recId).get.receiver,true)) 
     receiverMap.get(recId).get.receiver.onStart() 
     true 

    } 
    else 
    { 
     false 
    } 
    } 

} 

有効なブール値はどこからでも設定できます。受信機の内側または外側のいずれか。私はreceiveメソッドから設定しました。onStartメソッドは、受信側の有効なステータスをチェックします。それがfalseの場合、それは新しいスレッドで受け取る方法を再起動していない.....と沈黙があります:)

class IKodaTextSocketReceiver(host: String, port: Int, receiverId:String) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { 

    IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,true)) 




    def isEnabled(): Boolean = 
    { 
    val bo = IKodaTextSocketReceiver.receiverMap.get(receiverId) 
    if(bo.isDefined) 
     { 
     bo.get.enabled 
     } 
    else 
     { 
     false 
     } 
    } 

    def onStart() { 
    // Start the thread that receives data over a connection 
    if(isEnabled()) 
    { 
     logger.info("Starting IKodaTextSocketReceiver") 
     if(!super.isStopped()) { 
     new Thread("IKodaTextSocketReceiver") { 
      override def run() { 
      receive() 
      } 
     }.start() 
     } 
     else 
     { 
      logger.info("Receiver disabled") 
     } 
    } 
    else 
     { 
     logger.warn("Restarting after stop set") 
     } 
    } 

    def onStop() { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself isStopped() returns false 

    } 



    /** Create a socket connection and receive data until receiver is stopped */ 
    private def receive() { 
    var socket: Socket = null 
    var userInput: String = null 
    var keepReceiving=true; 
    try { 
     logger.info(s"Connecting to $host : $port") 
     socket = new Socket(host, port) 
     logger.info(s"Connected to $host : $port") 
     val reader = new BufferedReader(
     new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) 
     userInput = reader.readLine() 
     while(!isStopped && userInput != null) { 
     store(userInput) 
     userInput = reader.readLine() 
     if(userInput.contains(StreamingConstants.IKODA_END_STREAM)) 
      { 
      logger.info(s"${StreamingConstants.IKODA_END_STREAM}: Calling disabling receiver") 
      IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,false)) 
      //stop("Exiting, hopefully with elegance and dignity.") 
      } 
     } 
     reader.close() 
     socket.close() 
     logger.info("Stopped receiving") 
     restart("Trying to connect again",keepReceiving) 
    } catch { 
     case e: java.net.ConnectException => 
     restart(s"Error connecting to $host : $port"+e.getMessage,keepReceiving) 
     case t: Throwable => 
     restart("Error receiving data"+t.getMessage,keepReceiving) 
    } 
    } 
は、単にそれが HashMapに登録されているかどうかをチェックすることにより、カスタムの受信機を起動し

。そうでない場合は、新しいものを作成します。そうであれば、有効に設定されます。

 def doStream(ip: String, port: Int): Unit = { 

    try { 
     if(!IKodaTextSocketReceiver.restartReceiver("MyFirstReceiver")) 
     { 
     val ssc = getSparkStreamingContext(fieldVariables) 
     ssc.checkpoint("./ikoda/cp") 
     logger.info("open stream to " + ip + " " + port) 


     val ikReceiver = new IKodaTextSocketReceiver(ip, port, "MyFirstReceiver") 
     val lines: DStream[String] = ssc.receiverStream(ikReceiver) 


     etc etc etc 
関連する問題