2017-10-03 8 views
0

基本的には、共有メモリからデータを読み込むために、同じノード/エグゼキュータ上で複数のタスクを実行します。そのためには、タスクを開始する前にデータをメモリにロードする初期化関数が必要です。もしSparkがExecutorの起動のためのフックを提供するならば、この初期化コードをそのコールバック関数に入れて、この起動が完了した後にタスクが実行されるようにすることができます。SparkでExecutor Startupのフックがありますか?

私の質問は、スパークはそのようなフックを提供していますか?そうでない場合は、他の方法で、私は同じを達成することができますか?

答えて

0

複数のタスク(1つのアプリインスタンス、1つのSparkタスク)を実行できるように、アプリの複数のインスタンスを実行する必要はありません。複数のスレッドが同じSparkSessionオブジェクトを使用して、Sparkタスクを並列に送信できます。

だから、このように動作可能性があります

  • アプリケーションが起動し、メモリ内の共有データをロードするために初期化関数を実行します。 SharedDataクラスオブジェクトに書き込んでください。
  • スレッドプールが作成される
  • SparkSessionが作成され、各スレッドは(SparkSession、SharedData)へのアクセスを有する
  • 各スレッドが共有SparkSessionとSharedData オブジェクトを使用して、スパークタスクを作成するオブジェクト。
  • あなたのユースケースに応じて、アプリケーションは、次のいずれかを実行します。完了するためにすべてのタスクの
    • 待ちをしてから、新しい要求が到着するのを
    • 待機ループでセッションをスパーク閉じ、新しいスパークを作成します必要に応じてスレッドプールからスレッドを使用してタスクを実行します。あなたが使用して同時にキャンセルすることができsetJobGroupので、関連するタスクを使用してタスクにグループをsetJobDescriptionを使用してタスクの説明を割り当てたり、割り当てのようなスレッドごとの事をやりたいとき

SparkContext(sparkSession.sparkContext)が便利ですcancelJobGroup。同じプールを使用するタスクの優先順位を調整することもできます。詳しくはhttps://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-applicationを参照してください。 ドライバアプリケーションとスパークがそれをシリアライズし、執行(1回)のそれぞれに送信するには、一度データをロード - 「共有データ」の

+1

これはすべて真ですが、 "SharedData"がシリアライズ可能でない場合は不可能です。シリアライズ可能でサイズは大きい場合はそれほど効率的ではありません。ドライバアプリケーションで作成された "SharedData"をSpark変換で直接使用すると、シリアル化され、**タスクごとにエグゼキュータ**に送信されます。 –

+0

@TzachZohar良い点は、タスクごとにエグゼキュータに送られるSharedDataです。はい、SharedDataのブロードキャスト変数を使用すると、これを回避できます。シリアライゼーションの要件は、クロージャー変数とブロードキャスト変数の両方の変数にも適用されますが、そうではありませんか? –

+1

はい、シリアル化の要件がブロードキャストにも適用されます。しかし、(私がそれを正しく読んでいるならば)OPが目指しているのは、私が言及した「静的な」初期化オプションではありません。 –

0

スパークのソリューションは、放送を使用しています。タスクがそのデータを使用する場合、Sparkはタスクが実行される前にそのデータが存在することを確認します。

また
object MySparkTransformation { 

    def transform(rdd: RDD[String], sc: SparkContext): RDD[Int] = { 
    val mySharedData: Map[String, Int] = loadDataOnce() 
    val broadcast = sc.broadcast(mySharedData) 
    rdd.map(r => broadcast.value(r)) 
    } 
} 

、あなたはドライバーのメモリにデータを読み込み、執行にそれを上に送信しないようにしたい場合は、あなたがに一度を埋めます値を作成するために、Scalaのobjectlazyの値を使用できます。たとえば、 JVM、これはSparkのケースではエグゼキュータごとに1回です。例:実際には

// must be an object, otherwise will be serialized and sent from driver 
object MySharedResource { 
    lazy val mySharedData: Map[String, Int] = loadDataOnce() 
} 

// If you use mySharedData in a Spark transformation, 
// the "local" copy in each executor will be used: 
object MySparkTransformation { 
    def transform(rdd: RDD[String]): RDD[Int] = { 
    // Spark won't include MySharedResource.mySharedData in the 
    // serialized task sent from driver, since it's "static" 
    rdd.map(r => MySharedResource.mySharedData(r)) 
    } 
} 

、あなたは各エグゼキュータにmySharedDataの一つのコピーを持っています。

+0

はい、私は既にブロードキャスト機能を認識していますが、使用したくない理由は、自分のタスクが実行可能ファイルを実行しているためです。コードはCプログラムでコンパイルされています。そのデータをHDFSファイルから直接ロードして共有メモリに入れて、それらのタスクで使用できるようにします。もちろん、私はCコードを少し修正する必要があります。この目的のために、レイジーヴァルの方が適切に見えます。だから、私はそれをチェックします。 – pythonic

関連する問題