2017-10-19 25 views
1

Sparkは、たとえば、別のノードに障害が発生した場合に起動される新しいノードで実行する必要のある作業をどのように配布するかを知っています。Spark Datasetの選択的な再計算

これが他の使用例で利用できるかどうかを知りたいと思います。

私には、変換とアクションのツリーがあるとします。データセット/データフレームの1つが更新された場合(たとえば、新しいファイルがインポートされた場合)はどうなりますか。この場合、この変更に影響を及ぼし、リンクされている変更とアクションだけを繰り返したいと思います。関連しない他の変換やアクションは、影響を受けていないのでキャッシュから使用する必要があります。

ここで、私はこれらのデータフレームと変換とアクションのほんの少ししか手に入れることはできません。しかし、私は数十以上のそのようなDFを持っていて、私がここで私を助けることができるフレームワークの中にスパークが組み込まれているかどうかを理解しようとしています。ここで

は私のコードの例です:いくつかの変換のために今

val carLines = spark 
    .read 
    .option("header", "true") 
    .schema(carLineSchema) 
    .csv("src/test/resources/cars") 

val ageMappingFunction: Int => String = (age: Int) => if (age > 80) "old" else "young" 
// 
val _age = udf.register("_age", ageMappingFunction) 

val personLines = spark 
    .read 
    .option("header", "true") 
    .schema(personLineSchema) 
    .csv("src/test/resources/persons") 
    .withColumn("_age", _age($"age")) 

val accidentsLines = spark 
    .read 
    .option("header", "true") 
    .schema(accidentLineSchema) 
    .csv("src/test/resources/accidents") 

val carOwners = personLines 
    .withColumnRenamed("id", "driver_id") 
    .join(carLines, Seq("driver_id"), "left") 
    .withColumnRenamed("id", "car_id") 
    .withColumnRenamed("car_make", "car_maker") 
    .withColumnRenamed("driver_id", "id") 

val accidentsWithDrivers = accidentsLines 
    .join(personLines.withColumnRenamed("id", "driver_id"), "driver_id") 

val accidentsPerDriverID = accidentsWithDrivers 
    .groupBy("driver_id") 
    .agg(Map(
    "name" -> "count" 
)) 
    .withColumnRenamed("count(name)", "accident_count") 
    .withColumnRenamed("driver_id", "id") 

val finalTable = carOwners 
    .join(numberOfCarsPerDriver, Seq("id", "name", "age", "_age")) 
    .join(accidentsPerDriverID, "id") 

それから私はいくつかのアクションを(簡単にするために、私は 'ショー' を使用します)を行います

carOwners.show(true) 
numberOfCarsPerDriver.show(true) 
finalTable.show(true) 

accidentsLinesが変更されていてもcarLinesでない場合またはpersonLines。キャッシュされた値がcarLinespersonLinescarOwners変換を実行できますか?

他の言葉: 私はsparkクラスタ内でメモリ内に保持したいと仮定して、RDD#cache()apiを使って別のドライバの実行間で生き残ることはできますか?

答えて

1

私はjob-serverを使用またはApacheのIgniteからIgniteRDDサポートを使用するか必要が判明:

//WRITE 
val igniteContext = new IgniteContext(spark.sparkContext, "ignite-config.xml", true) 
val schema = dataframe.schema 
val rdd = dataframe.rdd 
igniteContext.ignite().getOrCreateCache("ignite-cache").put("schema", schema) 
igniteContext.fromCache(name).saveValues(rdd) 

//READ 
val schema = igniteContext.ignite() 
    .getOrCreateCache[String, StructType]("ignite-cache") 
    .get("schema") 
    .asInstanceOf[StructType] 

    val igniteRdd: IgniteRDD[String, Row] = igniteContext.fromCache(name) 
    val rdd = igniteRdd.map(a => a._2) 
    val dataframe = spark.createDataFrame(rdd, schema)