2017-04-25 12 views
0

SparkデータAPIを使用してヘッダレスCSVファイルをロードする次のクラスがあります。Apache Spark Dataset API - スキーマStructTypeを受け付けません

問題は、SparkSessionが各列を定義するスキーマStructTypeを受け入れることができないということです。結果のデータフレームは、String型のunamed列である

public class CsvReader implements java.io.Serializable { 

public CsvReader(StructType builder) { 
     this.builder = builder; 
    } 
private StructType builder; 

SparkConf conf = new SparkConf().setAppName("csvParquet").setMaster("local"); 
// create Spark Context 
SparkContext context = new SparkContext(conf); 
// create spark Session 
SparkSession sparkSession = new SparkSession(context); 

Dataset<Row> df = sparkSession 
     .read() 
     .format("com.databricks.spark.csv") 
     .option("header", false) 
     //.option("inferSchema", true) 
     .schema(builder) 
     .load("/Users/Chris/Desktop/Meter_Geocode_Data.csv"); //TODO: CMD line arg 

public void printSchema() { 
    System.out.println(builder.length()); 
    df.printSchema(); 
} 

public void printData() { 
    df.show(); 
} 

public void printMeters() { 
    df.select("meter").show(); 
} 

public void printMeterCountByGeocode_result() { 
    df.groupBy("geocode_result").count().show(); 
} 

public Dataset getDataframe() { 
      return df; 
} 

} 

結果のデータフレームのスキーマは次のとおりです。

root 
|-- _c0: string (nullable = true) 
|-- _c1: string (nullable = true) 
|-- _c2: string (nullable = true) 
|-- _c3: string (nullable = true) 
|-- _c4: string (nullable = true) 
|-- _c5: string (nullable = true) 
|-- _c6: string (nullable = true) 
|-- _c7: string (nullable = true) 
|-- _c8: string (nullable = true) 
|-- _c9: string (nullable = true) 
|-- _c10: string (nullable = true) 
|-- _c11: string (nullable = true) 
|-- _c12: string (nullable = true) 
|-- _c13: string (nullable = true) 

デバッガは、「ビルダー」StrucTypeが正しく定義されていることを示しています。

0 = {[email protected]} "StructField(geocode_result,DoubleType,false)" 
1 = {[email protected]} "StructField(meter,StringType,false)" 
2 = {[email protected]} "StructField(orig_easting,StringType,false)" 
3 = {[email protected]} "StructField(orig_northing,StringType,false)" 
4 = {[email protected]} "StructField(temetra_easting,StringType,false)" 
5 = {[email protected]} "StructField(temetra_northing,StringType,false)" 
6 = {[email protected]} "StructField(orig_address,StringType,false)" 
7 = {[email protected]} "StructField(orig_postcode,StringType,false)" 
8 = {[email protected]} "StructField(postcode_easting,StringType,false)" 
9 = {[email protected]} "StructField(postcode_northing,StringType,false)" 
10 = {[email protected]} "StructField(distance_calc_method,StringType,false)" 
11 = {[email protected]} "StructField(distance,StringType,false)" 
12 = {[email protected]} "StructField(geocoded_address,StringType,false)" 
13 = {[email protected]} "StructField(geocoded_postcode,StringType,false)" 

は私が何をしています違う?どんな助力も大歓迎です!

答えて

2

変数Dataset<Row> dfを定義し、CSVファイルを読み取るコードブロックをgetDataframe()以下のように移動します。

private Dataset<Row> df = null; 

public Dataset getDataframe() { 
    df = sparkSession 
     .read() 
     .format("com.databricks.spark.csv") 
     .option("header", false) 
     //.option("inferSchema", true) 
     .schema(builder) 
     .load("src/main/java/resources/test.csv"); //TODO: CMD line arg 
     return df; 
} 

これで、以下のように呼び出すことができます。

CsvReader cr = new CsvReader(schema); 
    Dataset df = cr.getDataframe(); 
    cr.printSchema(); 

あなたのクラスを再設計することをお勧めします。 1つのオプションは、dfを他のメソッドにパラメータとして渡すことができることです。 Spark 2.0を使用している場合、SparkConfは必要ありません。 SparkSessionを作成するにはdocumentationを参照してください。

+0

優秀!とても感謝しています! –

0

ビルダーで初期化する場合は、dfをコンストラクター関数に入れる必要があります。メンバー関数に入れることもできます。

関連する問題