最近Spark 2.0にアップグレードされ、JSON文字列から簡単なデータセットを作成しようとしたときに奇妙な動作が発生しています。ここでは、簡単なテストケースです:SparkSessionは1回のアクションで2回実行されるのはなぜですか?
SparkSession spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
"{\"name\":\"tom\",\"title\":\"engineer\",\"roles\":[\"designer\",\"developer\"]}",
"{\"name\":\"jack\",\"title\":\"cto\",\"roles\":[\"designer\",\"manager\"]}"
));
JavaRDD<String> mappedRdd = rdd.map(json -> {
System.out.println("mapping json: " + json);
return json;
});
Dataset<Row> data = spark.read().json(mappedRdd);
data.show();
そして出力:
mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
+----+--------------------+--------+
|name| roles| title|
+----+--------------------+--------+
| tom|[designer, develo...|engineer|
|jack| [designer, manager]| cto|
+----+--------------------+--------+
「マップ」機能は、私は1つのアクションだけを実行してるにもかかわらず、2回実行されているようです。私はSparkが遅れて実行計画を作成し、必要に応じて実行すると考えましたが、これによってJSONとしてデータを読み込んで何かを行うためには、計画を少なくとも2回実行する必要があるようです。
この単純なケースでは問題はありませんが、マップ機能が長時間実行されている場合は大きな問題になります。そうですか、何か不足していますか?