問題: 私は(スカラ座を使用して)スパークでのキーの最も一般的な値をマッピングするために問題を抱えています。私はスパークデータフレームは、/データセットは、各キーの効率化のための最も一般的な値を探す方法
データセットは、その各キーである必要があり、スパーク変換とアクセス出力した後
ようkey1 = value_a
key1 = value_b
key1 = value_b
key2 = value_a
key2 = value_c
key2 = value_c
key3 = value_a
あるRDDでそれを行っているが、DF/DS(sparksql)を効率的に行う方法がわかりません共通の値
出力
key1 = valueb
key2 = valuec
key3 = valuea
私はマップとRDDに(key,value),count
のグループによって削減することを試みた
RDD
をし、私が欲しいと(それがロジックになりますが、私は(データフレーム/データセット)sparksqlにこれを翻訳しカント:今まで試してみました0123:ネットワーク全体の最小シャッフル)
ここでは、RDD
ウィンドウイングを使用してval data = List(
"key1,value_a",
"key1,value_b",
"key1,value_b",
"key2,value_a",
"key2,value_c",
"key2,value_c",
"key3,value_a"
)
val sparkConf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(sparkConf)
val lineRDD = sc.parallelize(data)
val pairedRDD = lineRDD.map { line =>
val fields = line.split(",")
(fields(0), fields(2))
}
val flatPairsRDD = pairedRDD.flatMap {
(key, val) => ((key, val), 1)
}
val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b)
val resultsRDD = SumRDD.map{
case ((key, val), count) => (key, (val,count))
}.groupByKey.map{
case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head)
}
resultsRDD.collect().foreach(println)
DATAFRAME、のために私のコードですWindow.partitionBy("key", "value")
でcount over the window
を集計しようとしています。そして私はあなたの質問からわかる内容によると、それぞれTHN sorting
とagg()
と同じでなければなりません
以下のようにWindow
機能を使用して行うことができ、各キーのほとんどを繰り返し、値、およびaggregations
を選択し、あなたはする必要がありますキーでグループの後にウィンドウ関数を使用し、カウントで値をソートし、最初にランク付けされた行を取得します。これを確認することができますhttps://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group/ – eliasah
@eliasahありがとう、それを探して –
申し訳ありませんリンクを間違って!私はそれを更新しました。 – eliasah