2017-09-09 5 views
2

私はYARNを使用していて、SparkがYARNを使用してクラスタにジョブを送信した仕組みを理解しようとしました。だから私はソースの中に掘ったと私たちは仕事(例えばforeach)を提出する場合は、次の方法がSparkContext::runJobで実行されていることがわかった:スパークはどのようにしてクラスタにジョブを送信しますか?

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) 

その後、JobWaiter

DAGScheduler::submitJob 

JobSubmittedで作成されていますイベントが公開されています。このイベントは、リスナーバスに別のイベント(SparkListenerJobStart)を公開する

で処理されています。次に起動する

DAGScheduler::submitStage 

したがって、ステージにはクラスターにステージが送信されるロジックがあるはずです。しかし、唯一の私はこれが見た:

private def submitStage(stage: Stage) { 
    val jobId = activeJobForStage(stage) 
    if (jobId.isDefined) { 
     logDebug("submitStage(" + stage + ")") 
     if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { 
     val missing = getMissingParentStages(stage).sortBy(_.id) 
     logDebug("missing: " + missing) 
     if (missing.isEmpty) { 
      logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") 
      submitMissingTasks(stage, jobId.get) 
     } else { 
      for (parent <- missing) { 
      submitStage(parent) 
      } 
      waitingStages += stage 
     } 
     } 
    } else { 
     abortStage(stage, "No active job for stage " + stage.id, None) 
    } 
    } 

実際には、私はそこにどこかのNIOのようなコードを期待した。

QUESTION:我々はYARNを使用する場合、どのドライバプログラムは、クラスタと通信していますか?コードはどこにありますか?誰か助けてくれますか?

答えて

1

あなたが知っているように、Sparkは複数のクラスタマネージャで実行できます。 Sparkは、SchedulerBackendという抽象を使用してこれを実現します。糸について

、2つの実装があります。https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

  1. YarnClientSchedulerBackend(クライアント配布モード)
  2. YarnSchedulerBackend(クラスタ配備モード)

は、ここではソースコードであります

関連する問題