2016-09-25 17 views
1

クライアントモードでYarn(バージョン2.6.0)を使用するHadoopクラスタでSparkアプリケーション(バージョン1.6.0)を実行しています。私は長い計算を実行するコードを持っており、時間がかかりすぎたら(そして、代わりに他の関数を実行して)それを強制終了したい。ここ
は一例です:Apache Spark:コード内のジョブをキャンセルして実行中のタスクを強制終了する方法は?

val conf = new SparkConf().setAppName("TIMEOUT_TEST") 
val sc = new SparkContext(conf) 
val lst = List(1,2,3) 
// setting up an infite action 
val future = sc.parallelize(lst).map(while (true) _).collectAsync() 

try { 
    Await.result(future, Duration(30, TimeUnit.SECONDS)) 
    println("success!") 
} catch { 
    case _:Throwable => 
     future.cancel() 
     println("timeout") 
} 

// sleep for 1 hour to allow inspecting the application in yarn 
Thread.sleep(60*60*1000) 
sc.stop() 

タイムアウトは30秒に設定されたが、もちろん計算は無限大であり、かつので、将来の結果をお待ちしていますその後、キャッチされ、例外がスローされますされます未来がキャンセルされ、バックアップ機能が実行されます。
キャンセルされたジョブが完全に終了しないことを除いて、これはすべて正常に動作します。アプリケーションのWeb UIを見ると、ジョブは失敗とマークされますが、内部にまだ実行中のタスクがあることがわかります。

SparkContext.cancelAllJobsまたはSparkContext.cancelJobGroupを使用すると同じことが起こります。問題は、私が自分のプログラムで乗り越えることができたとしても、キャンセルされた仕事の実行中のタスクは、依然として貴重な資源を奪っていることです。

まとめ:Sparkジョブを、そのジョブの実行中のすべてのタスクも終了するにはどうしたらいいですか? setJobGroupによると

+0

どのように例外の場合にSparkContextの停止について、すなわちSparkContext.stop()とその後の仕事のための新しいSparkContextを初期化します。 – suj1th

+0

残念ながら、SparkContextはプロジェクト全体で共有されるため、これはオプションではありません。それを停止して新しいモジュールを開始すると、閉じたSparkContextへの参照を保持しているので、他のモジュールは失敗します –

答えて

0

(新しいタスクを実行しているから、ジョブを停止しますが、現在実行中のタスクが完了しせされ、今何が起こるかではなく):

「interruptOnCancelは、ジョブグループをtrueに設定されている場合は、ジョブの取り消しによって、スレッドのexecutorスレッドでThread.interrupt()が呼び出されます。

だからあなたのマップ内の安野機能は、このような割り込みでなければなりません:

val future = sc.parallelize(lst).map(while (!Thread.interrupted) _).collectAsync() 
関連する問題