2016-08-01 9 views
0

Sparkの1.5メソッドdropDuplicates()を使用して、DataFrameコンテンツをフィルタリングしようとしています。 CSVソースに空のセルが含まれている場合(ソースファイルを提供します)、Spark throw ArrayIndexOutOfBoundsException。 私は何が間違っていますか?私はSpark SQLとDataFramesチュートリアルをバージョン1.6.2で読みましたが、DataFrameの操作について詳しくは説明しません。私はまた、 "Learning Spark、Lightning-Fast Big Data Analysis"という本を読んでいますが、Spark 1.5のために書かれています。私は、マニュアルへのリンクのいずれかの説明を得ることがうれしいです。 ありがとうございます。Apache Spark SQLコンテキストdropDuplicates

package data; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructType; 

import java.util.Arrays; 

public class TestDrop { 
    public static void main(String[] args) { 
     DropData dropData = new DropData("src/main/resources/distinct-test.csv"); 
     dropData.execute(); 
    } 
} 

class DropData{ 

    private String csvPath; 
    private JavaSparkContext sparkContext; 
    private SQLContext sqlContext; 

    DropData(String csvPath) { 
     this.csvPath = csvPath; 
    } 

    void execute(){ 
     initContext(); 
     DataFrame dataFrame = loadDataFrame(); 
     dataFrame.show(); 
     dataFrame.dropDuplicates(new String[]{"surname"}).show(); 
     //this one fails too: dataFrame.drop("surname") 
    } 

    private void initContext() { 
     sparkContext = new JavaSparkContext(new SparkConf().setMaster("local[4]").setAppName("Drop test")); 
     sqlContext = new SQLContext(sparkContext); 
    } 

    private DataFrame loadDataFrame() { 
     JavaRDD<String> strings = sparkContext.textFile(csvPath); 

     JavaRDD<Row> rows = strings.map(string -> { 
      String[] cols = string.split(","); 
      return RowFactory.create(cols); 
     }); 

     StructType st = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false), 
       DataTypes.createStructField("surname", DataTypes.StringType, true), 
       DataTypes.createStructField("age", DataTypes.StringType, true), 
       DataTypes.createStructField("sex", DataTypes.StringType, true), 
       DataTypes.createStructField("socialId", DataTypes.StringType, true))); 

     return sqlContext.createDataFrame(rows, st); 
    } 
} 
+0

他に何をお探しですか?あなたはいくつかのフィールドを宣言し、それがあなたと一致しない場合は例外を取得します。これは予想される動作です。不正なデータを除外するだけです。 – zero323

+0

それはどういう意味ですか?私は "姓"の列を持っています。私はSparkがJavaDocで書かれているように、この列によれば重複している行をフィルタリングすることを期待しています。 Btw、ここに私の[csvファイル](http://pastebin.com/NgE6NU8A) –

答えて

0

Object []の代わりに送信リストを作成すると、リスト内に1つの列が含まれます。それは私が間違っていたことです。

関連する問題