2016-12-14 4 views
0

私のCSVファイル:RDD操作

import org.apache.spark.api.java.JavaSparkContext; 

public class RddCsv 
{ 
    public static void main(String[] args) 
    { 
    SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> allRows = sc.textFile("file:///home/kumar/Desktop/Eletricaldata/file8_2011.csv");//read csv file 
    System.out.println(allRows.take(5)); 
    } 
} 

私は新しい学習者sparkJava、 :CSVファイルを読み込むためのスパークコードダウンここ

YEAR,UTILITY_ID,UTILITY_NAME,OWNERSHIP,STATE_CODE,AMR_METERING_RESIDENTIAL,AMR_METERING_COMMERCIAL,AMR_METERING_INDUSTRIAL,AMR_METERING_TRANS,AMR_METERING_TOTAL,AMI_METERING_RESIDENTIAL,AMI_METERING_COMMERCIAL,AMI_METERING_INDUSTRIAL,AMI_METERING_TRANS,AMI_METERING_TOTAL,ENERGY_SERVED_RESIDENTIAL,ENERGY_SERVED_COMMERCIAL,ENERGY_SERVED_INDUSTRIAL,ENERGY_SERVED_TRANS,ENERGY_SERVED_TOTAL 
2011,34,City of Abbeville - (SC),M,SC,880,14,,,894,,,,,,,,,, 
2011,84,A & N Electric Coop,C,MD,135,25,,,160,,,,,,,,,, 
2011,84,A & N Electric Coop,C,VA,31893,2107,0,,34000,,,,,,,,,, 
2011,97,Adams Electric Coop,C,IL,8334,190,,,8524,,,,,0,,,,,0 
2011,108,Adams-Columbia Electric Coop,C,WI,33524,1788,709,,36021,,,,,,,,,, 
2011,118,Adams Rural Electric Coop, Inc,C,OH,7457,20,,,7477,,,,,,,,,, 
2011,122,Village of Arcade,M,NY,3560,498,100,,4158,,,,,,,,,, 
2011,155,Agralite Electric Coop,C,MN,4383,227,315,,4925,,,,,,,,,, 

そのCsvDatasetからPerticulerフィールド値を選択する方法と集計操作を実行する方法、およびデータセットを指定した変換とアクションを使用する方法。 perticularフィールドの値を選択する方法

+0

[Apache SparkでCSVファイルまたはJSONファイルを解析する方法](http://stackoverflow.com/questions/25362942/how-to-parsing-csv-or-json-file-with-apache-spark)の可能な複製) – Jobin

答えて

0
public static void main(String[] args) 
{ 
    SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> allRows = sc.textFile("file:///home/abhishek/Desktop/file8_2011.csv"); 
    System.out.println(allRows.take(5)); 
    List<String> headers= Arrays.asList(allRows.take(1).get(0).split(",")); 
    String field="YEAR"; 
    //Skip Header 
    JavaRDD<String>dataWithoutHeaders=allRows.filter(x -> !(x.split(",")[headers.indexOf(field)]).equals(field)); 
    //Take one field as integer 
    JavaRDD<Integer> years=dataWithoutHeaders.map(x -> Integer.valueOf(x.split(",")[headers.indexOf(field)])); 
    //Aggregate operation getTotal aggregate() arguments are initial value for a partition,aggregating function for a partition 
    //and aggregating function for results from different partition 
    int total=years.aggregate(0,RddCsv::sum,RddCsv::sum); 
    for (Integer i:years.collect()){ 
     System.out.println("year :: "+i); 
    } 
    System.out.println(total); 
} 

private static int sum(int a,int b){ 
    return a+b; 
} 

これは基本的なプログラムです。詳細はsparkのjava apisを参照してください。

+0

が動作していない、compitimeエラーがこの行に来ています。集合(0、RddCsv :: sum、RddCsv :: sum); – kumar

+0

私は今すぐ実行しました。 –

+0

出力:16/12/14 16:26:31 INFO TaskSetManager:ローカルホストの25ミリ秒でステージ3.0(TID 3)で完了したタスク012 1/1 16/12/14 16:26:31 INFO TaskSchedulerImpl:プールからタスクがすべて完了したTaskSet 3.0を削除しました 16/12/14 16:26:31 INFO DAGScheduler:ジョブ3が終了しました:RDDCsv.java:32で収集、0.049729秒でした 年:: 2011 年:: 2011 年:: 2011 年:: 2011 年:: 2011 年:: 2011 2011年 年:: :: 2011 16/12/14午後04時26分31秒INFO SparkContext:起動停止( )シャットダウンフック –