Sparkは、たとえば、別のノードに障害が発生した場合に起動される新しいノードで実行する必要のある作業をどのように配布するかを知っています。Spark Datasetの選択的な再計算
これが他の使用例で利用できるかどうかを知りたいと思います。
私には、変換とアクションのツリーがあるとします。データセット/データフレームの1つが更新された場合(たとえば、新しいファイルがインポートされた場合)はどうなりますか。この場合、この変更に影響を及ぼし、リンクされている変更とアクションだけを繰り返したいと思います。関連しない他の変換やアクションは、影響を受けていないのでキャッシュから使用する必要があります。
ここで、私はこれらのデータフレームと変換とアクションのほんの少ししか手に入れることはできません。しかし、私は数十以上のそのようなDFを持っていて、私がここで私を助けることができるフレームワークの中にスパークが組み込まれているかどうかを理解しようとしています。ここで
は私のコードの例です:いくつかの変換のために今
val carLines = spark
.read
.option("header", "true")
.schema(carLineSchema)
.csv("src/test/resources/cars")
val ageMappingFunction: Int => String = (age: Int) => if (age > 80) "old" else "young"
//
val _age = udf.register("_age", ageMappingFunction)
val personLines = spark
.read
.option("header", "true")
.schema(personLineSchema)
.csv("src/test/resources/persons")
.withColumn("_age", _age($"age"))
val accidentsLines = spark
.read
.option("header", "true")
.schema(accidentLineSchema)
.csv("src/test/resources/accidents")
val carOwners = personLines
.withColumnRenamed("id", "driver_id")
.join(carLines, Seq("driver_id"), "left")
.withColumnRenamed("id", "car_id")
.withColumnRenamed("car_make", "car_maker")
.withColumnRenamed("driver_id", "id")
:
:val accidentsWithDrivers = accidentsLines
.join(personLines.withColumnRenamed("id", "driver_id"), "driver_id")
val accidentsPerDriverID = accidentsWithDrivers
.groupBy("driver_id")
.agg(Map(
"name" -> "count"
))
.withColumnRenamed("count(name)", "accident_count")
.withColumnRenamed("driver_id", "id")
val finalTable = carOwners
.join(numberOfCarsPerDriver, Seq("id", "name", "age", "_age"))
.join(accidentsPerDriverID, "id")
それから私はいくつかのアクションを(簡単にするために、私は 'ショー' を使用します)を行います
carOwners.show(true)
numberOfCarsPerDriver.show(true)
finalTable.show(true)
accidentsLines
が変更されていてもcarLines
でない場合またはpersonLines
。キャッシュされた値がcarLines
とpersonLines
のcarOwners
変換を実行できますか?
他の言葉: 私はsparkクラスタ内でメモリ内に保持したいと仮定して、RDD#cache()apiを使って別のドライバの実行間で生き残ることはできますか?