スパークのソリューションは、放送を使用しています。タスクがそのデータを使用する場合、Sparkはタスクが実行される前にそのデータが存在することを確認します。
また
object MySparkTransformation {
def transform(rdd: RDD[String], sc: SparkContext): RDD[Int] = {
val mySharedData: Map[String, Int] = loadDataOnce()
val broadcast = sc.broadcast(mySharedData)
rdd.map(r => broadcast.value(r))
}
}
、あなたはドライバーのメモリにデータを読み込み、執行にそれを上に送信しないようにしたい場合は、あなたがに一度を埋めます値を作成するために、Scalaのobject
でlazy
の値を使用できます。たとえば、 JVM、これはSparkのケースではエグゼキュータごとに1回です。例:実際には
// must be an object, otherwise will be serialized and sent from driver
object MySharedResource {
lazy val mySharedData: Map[String, Int] = loadDataOnce()
}
// If you use mySharedData in a Spark transformation,
// the "local" copy in each executor will be used:
object MySparkTransformation {
def transform(rdd: RDD[String]): RDD[Int] = {
// Spark won't include MySharedResource.mySharedData in the
// serialized task sent from driver, since it's "static"
rdd.map(r => MySharedResource.mySharedData(r))
}
}
、あなたは各エグゼキュータにmySharedData
の一つのコピーを持っています。
これはすべて真ですが、 "SharedData"がシリアライズ可能でない場合は不可能です。シリアライズ可能でサイズは大きい場合はそれほど効率的ではありません。ドライバアプリケーションで作成された "SharedData"をSpark変換で直接使用すると、シリアル化され、**タスクごとにエグゼキュータ**に送信されます。 –
@TzachZohar良い点は、タスクごとにエグゼキュータに送られるSharedDataです。はい、SharedDataのブロードキャスト変数を使用すると、これを回避できます。シリアライゼーションの要件は、クロージャー変数とブロードキャスト変数の両方の変数にも適用されますが、そうではありませんか? –
はい、シリアル化の要件がブロードキャストにも適用されます。しかし、(私がそれを正しく読んでいるならば)OPが目指しているのは、私が言及した「静的な」初期化オプションではありません。 –