2017-07-08 14 views
-4

Spark Scalaでの2つのデータセットでの平均所得を計算するにはどうすればよいですか?Spark Scalaを使用して平均を計算してください

File1.csv

Ram, 30, Engineer, 40000 
Bala, 27, Doctor, 30000 
Hari, 33, Engineer, 50000 
Siva, 35, Doctor, 60000 

File2.csv(カラム2位置である)

Hari, Bangalore 
Ram, Chennai 
Bala, Bangalore 
Siva, Chennai 

上記ファイルがソートされていない(カラム4は給料です)。これらの2つのファイルを結合し、場所ごとの平均給与を見つける必要があります。私は以下のコードで試しましたが、それを作ることができません。

val salary = sc.textFile("File1.csv").map(e => e.split(",")) 
val location = sc.textFile("File2.csv").map(e.split(",")) 
val joined = salary.map(e=>(e(0),e(3))).join(location.map(e=>(e(0),e(1))) 
val joinedData = joined.sortByKey() 
val finalData = joinedData.map(v => (v._1,v._2._1._1,v._2._2)) 
val aggregatedDF = finalData.map(e=> e.groupby(e(2)).agg(avg(e(1))))  
aggregatedDF.repartition(1).saveAsTextFile("output.txt") 

どのように見えるかをコードとサンプル出力でお手伝いしてください。

多くのおかげ

答えて

0

私はデータフレームのAPIを使用することになり、このうまくいくはずです:

val salary = sc.textFile("File1.csv") 
       .map(e => e.split(",")) 
       .map{case Seq(name,_,_,salary) => (name,salary)} 
       .toDF("name","salary") 

val location = sc.textFile("File2.csv") 
       .map(e => e.split(",")) 
       .map{case Seq(name,location) => (name,location)} 
       .toDF("name","location") 

import org.apache.spark.sql.functions._ 

salary 
    .join(location,Seq("name")) 
    .groupBy($"location") 
    .agg(
    avg($"salary").as("avg_salary") 
) 
    .repartition(1) 
    .write.csv("output.csv") 
+0

最終的な出力は以下のようになりますか? + ------------------------ + |ロケーション| avg_salary | + ------------------------ + |バンガロール| 40000 | |チェンナイ| 500000 | + ------------------------ + – akrockz

+0

もう1つの疑問..給料の代わりに、600 * 200(length * width )、この場合の平均値はどのように求められますか?ラム600 * 200ハリ700 * 300など... – akrockz

0

あなたはこのような何か行うことができます:

val salary = sc.textFile("File1.csv").map(_.split(",").map(_.trim)) 
val location = sc.textFile("File2.csv").map(_.split(",").map(_.trim)) 
val joined = salary.map(e=>(e(0),e(3).toInt)).join(location.map(e=>(e(0),e(1)))) 
val locSalary = joined.map(v => (v._2._2, v._2._1)) 
val averages = locSalary.aggregateByKey((0,0))((t,e) => (t._1 + 1, t._2 + e), 
     (t1,t2) => (t1._1 + t2._1, t1._2 + t2._2)).mapValues(t => t._2/t._1) 

を、その後averages.take(10)が得られます:

res5: Array[(String, Int)] = Array((Chennai,50000), (Bangalore,40000)) 
+0

感謝を。 salaryの代わりに600 * 200(length * width)のような次元があるとします。この場合平均はどうやって見つけられますか? Ram 600 * 200 ハリ700 * 300 など... – akrockz

+0

サイズは文字列として指定されていますか?面積を平均化するか(長さに幅を掛けたもの)、またはこれらの各次元の平均を求めますか? – Harald

+0

私はこれらの各次元の平均をグループごとに計算したいと考えています。 – akrockz

1

を私はデータフレームを使用します。

: ファーストは、次のようなデータフレームを読みます
val salary = spark.read.option("header", "true").csv("File1.csv") 
val location = spark.read.option("header", "true").csv("File2.csv") 

ヘッダーがない場合は、オプションを "false"に設定し、withColumnRenamedを使用してデフォルト名を変更する必要があります。やる保存する

val avg = joined.groupby("location").agg(avg($"salary")) 

val joined = salary.join(location, "name") 

最後に平均計算を実行します。今すぐ登録ください

val salary = spark.read.option("header", "false").csv("File1.csv").toDF("name", "age", "job", "salary") 
val location = spark.read.option("header", "false").csv("File2.csv").toDF("name", "location") 

avg.repartition(1).write.csv("output.csv") 
+0

返信ありがとうございます。 salaryの代わりに600 * 200(length * width)のような次元があるとします。この場合平均はどうやって見つけられますか? RAM 600 * 200ハリ700 * 300など... – akrockz

+0

どういう意味ですか?名前ごとに複数の列がある、各名前の複数の出現を意味しますか? –

1

あなたが平均値を得るためにそれらを結合し、グループ、その後、データフレームとしてCSVファイルを読むことができます:平均寸法を計算するための

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
    "name", "age", "title", "salary" 
) 

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
    "name", "location" 
) 

import org.apache.spark.sql.functions._ 

val dfAverage = df1.join(df2, Seq("name")). 
    groupBy(df2("location")).agg(avg(df1("salary")).as("average")). 
    select("location", "average") 

dfAverage.show 
+-----------+-------+ 
| location|average| 
+-----------+-------+ 
|Bangalore |40000.0| 
| Chennai |50000.0| 
+-----------+-------+ 

[UPDATE]を:返信用

// file1.csv: 
Ram,30,Engineer,40000,600*200 
Bala,27,Doctor,30000,800*400 
Hari,33,Engineer,50000,700*300 
Siva,35,Doctor,60000,600*200 

// file2.csv 
Hari,Bangalore 
Ram,Chennai 
Bala,Bangalore 
Siva,Chennai 

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
    "name", "age", "title", "salary", "dimensions" 
) 

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
    "name", "location" 
) 

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.IntegerType 

val dfAverage = df1.join(df2, Seq("name")). 
    groupBy(df2("location")). 
    agg(
    avg(split(df1("dimensions"), ("\\*")).getItem(0).cast(IntegerType)).as("avg_length"), 
    avg(split(df1("dimensions"), ("\\*")).getItem(1).cast(IntegerType)).as("avg_width") 
). 
    select(
    $"location", $"avg_length", $"avg_width", 
    concat($"avg_length", lit("*"), $"avg_width").as("avg_dimensions") 
) 

dfAverage.show 
+---------+----------+---------+--------------+ 
| location|avg_length|avg_width|avg_dimensions| 
+---------+----------+---------+--------------+ 
|Bangalore|  750.0| 350.0| 750.0*350.0| 
| Chennai|  600.0| 200.0| 600.0*200.0| 
+---------+----------+---------+--------------+ 
+0

返事をありがとう。 salaryの代わりに600 * 200(length * width)のような次元があるとします。この場合平均はどうやって見つけられますか? RAM 600 * 200ハリ700 * 300など... – akrockz

+0

@akrockz、拡張答えをご覧ください。 –

+0

ありがとうございます@レオC ..これは私が探していたものです..最後のリクエスト..現在私は私のラップトップにスパークのセットアップを持っていない..私は私にメールを送信する場合は、入力データ? あまりにも多くをお詫び申し訳ありません..ありがとう – akrockz

関連する問題