2017-05-29 11 views
0

という名前の列があり、値の範囲が[0, 1]のデータフレームがあるとします。 [0, 0.1)[0.1, 0.2) ... [0.9, 1]などの範囲の列xの値で分割したいと考えています。それを行うための良いと速い方法がありますか?私はScalaでSpark 2を使用しています。Spark Scalaデータフレームをいくつかの値の範囲で分割します

更新:理想的には、各範囲のデータを含む10個の新しいデータフレームが必要です。

答えて

0

範囲を作成するためのPsidomのソリューション@の拡大、ここでは各範囲のためのデータフレームを作成するために、一つのアプローチです:

import org.apache.spark.sql.types.IntegerType 
val df = Seq(0.2, 0.71, 0.95, 0.33, 0.28, 0.8, 0.73).toDF("x") 
val df2 = df.withColumn("g", ($"x" * 10.0).cast(IntegerType)) 

df2.show 
+----+---+ 
| x| g| 
+----+---+ 
| 0.2| 2| 
|0.71| 7| 
|0.95| 9| 
|0.33| 3| 
|0.28| 2| 
| 0.8| 8| 
|0.73| 7| 
+----+---+ 

val dfMap = df2.select($"g").distinct. 
    collect. 
    flatMap(_.toSeq). 
    map(g => g -> df2.where($"g" === g)). 
    toMap 

dfMap.getOrElse(3, null).show 
+----+---+ 
| x| g| 
+----+---+ 
|0.33| 3| 
+----+---+ 

dfMap.getOrElse(7, null).show 
+----+---+ 
| x| g| 
+----+---+ 
|0.71| 7| 
|0.73| 7| 
+----+---+ 

[UPDATE]

あなたの範囲が不規則であれば、次のように、UDFでそれをラップし、その後、対応のInt範囲IDにダブルをマップする関数を定義することができます

val g: Double => Int = x => x match { 
    case x if (x >= 0.0 && x < 0.12345) => 1 
    case x if (x >= 0.12345 && x < 0.4834) => 2 
    case x if (x >= 0.4834 && x < 1.0) => 3 
    case _ => 99 // catch-all 
} 

val groupUDF = udf(g) 

val df = Seq(0.1, 0.2, 0.71, 0.95, 0.03, 0.09, 0.44, 5.0).toDF("x") 
val df2 = df.withColumn("g", groupUDF($"x")) 

df2.show 
+----+---+ 
| x| g| 
+----+---+ 
| 0.1| 1| 
| 0.2| 2| 
|0.71| 3| 
|0.95| 3| 
|0.03| 1| 
|0.09| 1| 
|0.44| 2| 
| 5.0| 99| 
+----+---+ 
+0

dfMapについてもう少し説明できますか?そして、あなたはこの方法が効率的だと思いますか?ありがとう! –

+0

'dfMap'を作成するための変換はキーとして別個の' g'を、値としてデータフレーム(対応する 'g'を持つ)を' Map'としてローカルな配列に別々の 'g'を集めてリストに変換しますリスト内のすべての要素に対してフィルタリングされたデータフレームの 'Map'を作成します。これらの変換はデータセット全体に適用されるため、安価ではありません。私は 'dfMap'で選択したデータフレームで広範な計算を行うことを計画しているなら、彼らがやる価値があると言いたいと思います。 –

+0

あなたが何を指しているのか分かります。私がしようとしていることは、各カテゴリ(ここでは 'g')のxの合計を計算することです。それを行う良い方法はありますか? –

0

あなたは二重の型指定された列を離散化することを意図している場合は、ちょうどこの(カラムは10個の個別のビンにカットされ、整数型にキャストし、その後10で列を乗算して)行う可能性があります:

import org.apache.spark.sql.types.IntegerType 

val df = Seq(0.32, 0.5, 0.99, 0.72, 0.11, 0.03).toDF("A") 
// df: org.apache.spark.sql.DataFrame = [A: double] 

df.withColumn("new", ($"A" * 10).cast(IntegerType)).show 
+----+---+ 
| A|new| 
+----+---+ 
|0.32| 3| 
| 0.5| 5| 
|0.99| 9| 
|0.72| 7| 
|0.11| 1| 
|0.03| 0| 
+----+---+ 
+1

私は10の新しいデータフレームをしたいし、さらにいくつかの計算を行います。私は '.filter()'を使うと思っていますが、これがうまくいくかどうかわかりませんが、これは十分高速です(データが膨大です)。 –

+0

新しいデータフレームを10個作成するのではなく、 'groupBy(" new ")'が必要な場合があります。 – Psidom

+0

私はそれを試みます! –

関連する問題