2017-11-14 11 views
0

問題: 私は(スカラ座を使用して)スパークでのキーの最も一般的な値をマッピングするために問題を抱えています。私はスパークデータフレームは、/データセットは、各キーの効率化のための最も一般的な値を探す方法

データセットは、その各キーである必要があり、スパーク変換とアクセス出力した後

よう
key1 = value_a 
key1 = value_b 
key1 = value_b 
key2 = value_a 
key2 = value_c 
key2 = value_c 
key3 = value_a 

あるRDDでそれを行っているが、DF/DS(sparksql)を効率的に行う方法がわかりません共通の値

出力

key1 = valueb 
key2 = valuec 
key3 = valuea 

私はマップとRDDに(key,value),countのグループによって削減することを試みた

RDD

をし、私が欲しいと(それがロジックになりますが、私は(データフレーム/データセット)sparksqlにこれを翻訳しカント:今まで試してみました0123:ネットワーク全体の最小シャッフル)

ここでは、RDD

ウィンドウイングを使用して
val data = List(

"key1,value_a", 
"key1,value_b", 
"key1,value_b", 
"key2,value_a", 
"key2,value_c", 
"key2,value_c", 
"key3,value_a" 

) 

val sparkConf = new SparkConf().setMaster("local").setAppName("example") 
val sc = new SparkContext(sparkConf) 

val lineRDD = sc.parallelize(data) 

val pairedRDD = lineRDD.map { line => 
val fields = line.split(",") 
(fields(0), fields(2)) 
} 

val flatPairsRDD = pairedRDD.flatMap { 
    (key, val) => ((key, val), 1) 
} 

val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b) 




val resultsRDD = SumRDD.map{ 
    case ((key, val), count) => (key, (val,count)) 
}.groupByKey.map{ 
    case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head) 
} 

resultsRDD.collect().foreach(println) 

DATAFRAME、のために私のコードですWindow.partitionBy("key", "value")count over the windowを集計しようとしています。そして私はあなたの質問からわかる内容によると、それぞれTHN sortingagg()

+0

と同じでなければなりませんWindow機能を使用して行うことができ、各キーのほとんどを繰り返し、値、およびaggregations

以下のように
import org.apache.spark.sql.expressions._ //import Window library def windowSpec = Window.partitionBy("key", "value") //defining a window frame for the aggregation import org.apache.spark.sql.functions._ //importing inbuilt functions df.withColumn("count", count("value").over(windowSpec)) // counting repeatition of value for each group of key, value and assigning that value to new column called as count .orderBy($"count".desc) // order dataframe with count in descending order .groupBy("key") // group by key .agg(first("value").as("value")) //taking the first row of each key with count column as the highest 

を選択し、あなたはする必要がありますキーでグループの後にウィンドウ関数を使用し、カウントで値をソートし、最初にランク付けされた行を取得します。これを確認することができますhttps://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group/ – eliasah

+1

@eliasahありがとう、それを探して –

+0

申し訳ありませんリンクを間違って!私はそれを更新しました。 – eliasah

答えて

1

はここにあなたがデータを読み込み、dataframe

だろう
val df = sc.textFile("path to the data file") //reading file line by line 
    .map(line => line.split("="))     // splitting each line by = 
    .map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created 
    .toDF("key", "value")      //rdd converted to dataframe which required import sqlContext.implicits._ 

に変換する必要があり

最初に何ができるかです

+----+-------+ 
|key |value | 
+----+-------+ 
|key1|value_a| 
|key1|value_b| 
|key1|value_b| 
|key2|value_a| 
|key2|value_c| 
|key2|value_c| 
|key3|value_a| 
+----+-------+ 

次のステップは、各キーの同じ値の繰り返しをカウントすることですしたがって、最終的な出力が

+----+-------+ 
|key |value | 
+----+-------+ 
|key3|value_a| 
|key1|value_b| 
|key2|value_c| 
+----+-------+ 
+0

答えに感謝、私はテストしますすぐに –

+0

は可能な限りコメントしました。 –

関連する問題