クライアントモードでYarn(バージョン2.6.0)を使用するHadoopクラスタでSparkアプリケーション(バージョン1.6.0)を実行しています。私は長い計算を実行するコードを持っており、時間がかかりすぎたら(そして、代わりに他の関数を実行して)それを強制終了したい。ここ
は一例です:Apache Spark:コード内のジョブをキャンセルして実行中のタスクを強制終了する方法は?
val conf = new SparkConf().setAppName("TIMEOUT_TEST")
val sc = new SparkContext(conf)
val lst = List(1,2,3)
// setting up an infite action
val future = sc.parallelize(lst).map(while (true) _).collectAsync()
try {
Await.result(future, Duration(30, TimeUnit.SECONDS))
println("success!")
} catch {
case _:Throwable =>
future.cancel()
println("timeout")
}
// sleep for 1 hour to allow inspecting the application in yarn
Thread.sleep(60*60*1000)
sc.stop()
タイムアウトは30秒に設定されたが、もちろん計算は無限大であり、かつので、将来の結果をお待ちしていますその後、キャッチされ、例外がスローされますされます未来がキャンセルされ、バックアップ機能が実行されます。
キャンセルされたジョブが完全に終了しないことを除いて、これはすべて正常に動作します。アプリケーションのWeb UIを見ると、ジョブは失敗とマークされますが、内部にまだ実行中のタスクがあることがわかります。
SparkContext.cancelAllJobsまたはSparkContext.cancelJobGroupを使用すると同じことが起こります。問題は、私が自分のプログラムで乗り越えることができたとしても、キャンセルされた仕事の実行中のタスクは、依然として貴重な資源を奪っていることです。
まとめ:Sparkジョブを、そのジョブの実行中のすべてのタスクも終了するにはどうしたらいいですか? setJobGroupによると
どのように例外の場合にSparkContextの停止について、すなわちSparkContext.stop()とその後の仕事のための新しいSparkContextを初期化します。 – suj1th
残念ながら、SparkContextはプロジェクト全体で共有されるため、これはオプションではありません。それを停止して新しいモジュールを開始すると、閉じたSparkContextへの参照を保持しているので、他のモジュールは失敗します –