コードからflinkジョブを停止/キャンセルしたい状況に陥っています。これは、私のフリンクジョブにタスクを提出し、結果を確認する統合テストです。ジョブが非同期で実行されるため、テストが失敗してもパスしなくても停止しません。テストが終わった後、私は仕事をやりたいコードからApache Flinkジョブを取り消す
私は、以下のリストしていますいくつか試してみました:jobmanagerへ をキャンセルするリクエストを送信、
- がjobmanager俳優
- は、実行中の各ジョブの場合、ジョブ
- を実行ゲット
これはもちろん実行されていませんが、jobmanagerのactorrefが間違っているのか、それとも他のものがないのか分かりません。
エラーは次のとおりです。[flink-akka.actor.default-dispatcher-5] [akka:// flink/user/jobmanager_1]メッセージ[org.apache.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $ ]をアクタ[akka:// flink/temp/$ a]からアクタ[akka:// flink/user/jobmanager_1]に転送できませんでした。 [1]デッドレターが発生しました。このロギングは、構成設定「akka.log-dead-letters」と「akka.log-dead-letters-during-shutdown」を使用してオフまたは調整できます。
これは、ジョブマネージャのアクターrefが間違っているか、送信されたメッセージが正しくありません。
コードは、次のようになります。これは正しいアプローチであれば
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS)) try { val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS)) if(result.isInstanceOf[RunningJobsStatus]){ val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages() val itr = runningJobs.iterator() while(itr.hasNext){ val jobId = itr.next().getJobId val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS))); try { Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS)) } catch { case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e } } } } catch{ case e : Exception => "Could not retrieve running jobs from the JobManager." + e } }
誰かがチェックすることはできますか?
EDIT: ジョブを完全に停止するには、まずTaskManagerをJobManagerの順序でTaskManagerとJobManagerの順に停止する必要があります。
こんにちは、あなたが指摘したことは正しいものでした。ジョブマネージャのリモートアドレスを取得する方法はありますか? FlinkMiniClusterは、環境によって直接作成されるため、アクセスできません。私は私の統合テストの一部であった仕事をやめたいと思っていました。今は私がFlinkMiniClusterを作成するコードを抽象化して、私がそれを制御して停止できるようにしました。 – Tej
現時点では、Flink APIはそのような機能を提供していません。しかし、私たちは実行中のジョブをより明示的に制御できる 'JobClient'を導入することでその改善に取り組んでいます。これまでのところ、自分のFlinkMiniClusterを作成し、それに統合テストの仕事を提出することが、あなたの記述を達成する最良の方法です。 –
おかげでたくさん! – Tej