2016-08-12 13 views
1

最近Spark 2.0にアップグレードされ、JSON文字列から簡単なデータセットを作成しようとしたときに奇妙な動作が発生しています。ここでは、簡単なテストケースです:SparkSessionは1回のアクションで2回実行されるのはなぜですか?

SparkSession spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate(); 
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); 

JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
      "{\"name\":\"tom\",\"title\":\"engineer\",\"roles\":[\"designer\",\"developer\"]}", 
      "{\"name\":\"jack\",\"title\":\"cto\",\"roles\":[\"designer\",\"manager\"]}" 
     )); 

JavaRDD<String> mappedRdd = rdd.map(json -> { 
    System.out.println("mapping json: " + json); 
    return json; 
}); 

Dataset<Row> data = spark.read().json(mappedRdd); 
data.show(); 

そして出力:

mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]} 
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]} 
mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]} 
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]} 
+----+--------------------+--------+ 
|name|    roles| title| 
+----+--------------------+--------+ 
| tom|[designer, develo...|engineer| 
|jack| [designer, manager]|  cto| 
+----+--------------------+--------+ 

「マップ」機能は、私は1つのアクションだけを実行してるにもかかわらず、2回実行されているようです。私はSparkが遅れて実行計画を作成し、必要に応じて実行すると考えましたが、これによってJSONとしてデータを読み込んで何かを行うためには、計画を少なくとも2回実行する必要があるようです。

この単純なケースでは問題はありませんが、マップ機能が長時間実行されている場合は大きな問題になります。そうですか、何か不足していますか?

答えて

2

これは、DataFrameReaderのスキーマを提供していないために発生します。その結果、Sparkは、出力スキーマを推測するためにデータセットを熱心にスキャンする必要があります。スキーマ推論のための

  • 一度
  • 一度あなたがないようにしたい場合は、読者のためのスキーマを提供する必要があり

data.showを呼び出す:

mappedRddが、それは二度評価されるキャッシュされていませんので、 (Scalaの構文):

val schema: org.apache.spark.sql.types.StructType = ??? 
spark.read.schema(schema).json(mappedRdd) 
関連する問題