2016-08-05 11 views
1

私はスパークするのが初めてです。私はドキュメントの基本的な例のいくつかに従っています。 (簡易版、本物の1はほぼ40,000行を持っている)sparkでcsvファイルをループするための最善のアプローチ

date,category 
19900108,apples 
19900108,apples 
19900308,peaches 
19900408,peaches 
19900508,pears 
19910108,pears 
19910108,peaches 
19910308,apples 
19910408,apples 
19910508,apples 
19920108,pears 
19920108,peaches 
19920308,apples 
19920408,peaches 
19920508,pears 
Scalaのコードのこのビットは、カテゴリをカウントするために正常に動作

val textFile = sc.textFile("sample.csv") 
textFile.filter(line => line.contains("1990")).filter(line =>line.contains("peaches")).count() 
textFile.filter(line => line.contains("1990")).filter(line => line.contains("apples")).count() 
textFile.filter(line => line.contains("1990")).filter(line => line.contains("pears")).count() 

を合計:

は、私はこのようなCSVファイルを持っていますそれぞれの行をループして、年ごとにカテゴリ合計を追加するのに最適な方法は何ですか?

date,apples,peaches,pears 
1990,2,2,1 
1991,3,1,1 
1992,1,2,2 

助けていただければ幸いです。

+2

可能重複[ピボットスパークデータフレーム(http://stackoverflow.com/questions/30244910/pivot-spark-dataframe) – zero323

答えて

1
//Create Spark SQL Context  
val sqlContext = new SQLContext(sc) 

//read csv 
var df = sqlContext.read.format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .load("sample.csv") 
df = df.withColumn("year", df.col("date").substr(0,4)) 
df = df.groupBy("year").pivot("category").agg("category"->"count") 
df.withColumn("total", df.col("apples").+(df.col("peaches")).+(df.col("pears"))).show() 

//Dependency required: 
<dependency> 
     <groupId>com.databricks</groupId> 
     <artifactId>spark-csv_2.10</artifactId> 
     <version>1.4.0</version> 
</dependency> 
+0

[2.0スパーク](https://spark.apache.org/releases /spark-release-2-0-0.html#new-features)が最近リリースされました。現在、ネイティブCSVサポートを提供しています:) – NikoNyrh

+0

あなたの答えはVenkatNに感謝します。私は過去数日のうちに天気の下にいましたので、私はこれをもっと早く見ることができませんでした。 このスクリプトを実行すると、次の行に "error reassignment val"というメッセージが表示されます。 df = df.withColumn( "year"、df.col( "date")。substr(0,4)) With Spark 2,0は、私がdatabricks csvパッケージを含まないことを意味しますか? – ronmac

+0

私は同じ理由でvalの代わりに "df"をvarと宣言しました。 "error reassignment val"を避けるには、同じことをしなければならないか、新しい変数をdfに再割り当てする代わりに初期化することができます: 'val df2 = df.withColumn(" year "、df.col(" ( "カテゴリ" - > "カウント")。 – VenkatN

関連する問題