2017-11-04 22 views
0

ここに私のJava sparkコードがあります。これはSpark CSVデータ形式です。spark rdd生成時にCSVファイルの列を処理する方法は?

"f_name","l_name","job","gender","age","salary" 
"Elsdon","Jaycob","Java programmer","male",43,2000 
"Tamsen","Brittany","Java programmer","female",23,1500 
"Floyd","Donny","Java programmer","male",33,1800 

そして私は、上記のデータに

public class Person implements Serializable { 

    private String firstName; 
    private String lastName; 
    private String job; 
    private String gender; 
    private int salary; 
    private int age; 

    public Person(String firstName, String lastName, String job, String gender, int age, int salary) { 

     this.firstName = firstName; 
     this.lastName = lastName; 
     this.job = job; 
     this.gender = gender; 
     this.age = age; 
     this.salary = salary; 
     } 
... getter and setter method. 

が含まれており、以下のコードは、スパークJavaクライアントとJavaのRDDを生成しようとPersonクラスを生成します。

SparkConf sc = new SparkConf().setAppName("SparkTest").setMaster("local[*]"); 
JavaSparkContext jsc = new JavaSparkContext(sc); 
JavaRDD<String> rdd_text = jsc.textFile("file:///" + srcDir + srcFile); 


String[] header = rdd_text.map(line -> line.split(",")).first(); 
System.out.println(header[4]); // "age" is printed 
JavaRDD<Person> persons = rdd_text.filter(line -> line.split(",")[4] != header[4]).map(
    line -> { 
     String[] info = line.split(","); 

     System.out.println(info[4]); //43,23,33,"age" are printed 

     Person p = new Person(info[0], info[1], info[2], info[3], 
            Integer.parseInt(info[4]), Integer.parseInt(info[5])); 

    return p; 
}); 

System.out.println(persons.collect()); 

のSystem.out.println(インフォ[4])のコード行が印刷:

43 
23 
33 
"age" 

し、それが次の例外をスローし、

java.lang.NumberFormatException: For input string: ""age"" 
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) 
    at java.lang.Integer.parseInt(Integer.java:569) 
    at java.lang.Integer.parseInt(Integer.java:615) 
    at com.aaa.spark.JavaClient.lambda$2(JavaClient.java:33) 
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

は、私がどのライン見当がつかないエラーコードと理由があります。 System.out.println(info[4])は "age"文字列値を出力します。 (情報[4])ヘッダを持つ

jsc.textFile("file:///" + srcDir + srcFile); 

ファイルの最初の行もInteger.parseIntによって処理された(「年齢」の値を持つ)とこのような理由:

答えて

1

あなたは、通常のテキストファイルではなく、CSVなどのファイルを読みますエラーのために。

スパークが解析CSVのための具体的な方法を持っている、あなたがそれらを使用することができます。

https://github.com/databricks/spark-csv

最新のスパークバージョンでは、CSVは、ボックスから解析している、マニュアルを確認してください。

関連する問題