私はYARN
を使用していて、SparkがYARNを使用してクラスタにジョブを送信した仕組みを理解しようとしました。だから私はソースの中に掘ったと私たちは仕事(例えばforeach
)を提出する場合は、次の方法がSparkContext::runJob
で実行されていることがわかった:スパークはどのようにしてクラスタにジョブを送信しますか?
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
その後、JobWaiter
が
DAGScheduler::submitJob
とJobSubmitted
で作成されていますイベントが公開されています。このイベントは、リスナーバスに別のイベント(SparkListenerJobStart
)を公開する
で処理されています。次に起動する
DAGScheduler::submitStage
したがって、ステージにはクラスターにステージが送信されるロジックがあるはずです。しかし、唯一の私はこれが見た:
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
実際には、私はそこにどこかのNIOのようなコードを期待した。
QUESTION:我々はYARN
を使用する場合、どのドライバプログラムは、クラスタと通信していますか?コードはどこにありますか?誰か助けてくれますか?