2017-04-12 21 views
0

私はSQL以上の特定のスパークSQLでこれを行うことができる方法の範囲0,10,20,30,40,...80,90,100,110,120への集約やビンの値を希望スパークSQL - 範囲

+---------------+------+ 
|id    | value| 
+---------------+------+ 
|    1|118.0| 
|    2|109.0| 
|    3|113.0| 
|    4| 82.0| 
|    5| 60.0| 
|    6|111.0| 
|    7|107.0| 
|    8| 84.0| 
|    9| 91.0| 
|    10|118.0| 
+---------------+------+ 

ANSのようなテーブルを持っているに集計データフレーム?

現在のところ、私は範囲との結合に側面を持っていますが、これはやや不器用で非効率的です。

離散化された分位は実際には私が望むものではなく、むしろCUTの範囲です。

編集

https://github.com/collectivemedia/spark-ext/blob/master/sparkext-mllib/src/main/scala/org/apache/spark/ml/feature/Binning.scalaは、動的なビンを行うだろうが、私はむしろ、この指定された範囲が必要になります。

+0

私は答えを更新しました。それはあなたが探しているものですか? –

+0

かなり。最初の一見でかなりよく見えます。ありがとう。 –

+0

'org.apache.spark.ml.feature.Bucketizer'は明示的に指定された分割点の配列をとります。その後、出力列をグループ化することができます。 –

答えて

1

私は単なる初心者です。しかし、私は示唆を持っています。それがうまくいくか試してみてください。一般的なケースで

SELECT id, (value DIV 10)*10 FROM table_name ; 
+0

あなたはそれがscala SQL DSL APIでどのように機能するのか知っていますか? –

+0

私はそれについて全く知らない。 :-( –

+1

ところで 'df.select(( 'value divide 10).cast(" int ")* 10).show'はSQL APIで動作します –

5

、静的ビニングはorg.apache.spark.ml.feature.Bucketizerを用いて行うことができる。

val data = Array(
    (1, 118.0), (2, 109.0), (3, 113.0), (4, 82.0), (5, 60.0), 
    (6, 111.0), (7, 107.0), (8, 84.0), (9, 91.0), (10, 118.0) 
) 
val df = spark.createDataFrame(data).toDF("id", "value") 

import org.apache.spark.ml.feature.Bucketizer 

val splits = (0 to 12).map(_ * 10.0).toArray 

val bucketizer = new Bucketizer() 
    .setInputCol("value") 
    .setOutputCol("bucket") 
    .setSplits(splits) 

val bucketed = bucketizer.transform(df) 

bucketed.groupBy($"bucket").agg(count($"id").as("count")).show() 

結果:値が定義されたビンの外側にある場合

+------+-----+                
|bucket|count| 
+------+-----+ 
| 8.0| 2| 
| 11.0| 4| 
| 10.0| 2| 
| 6.0| 1| 
| 9.0| 1| 
+------+-----+ 

bucketizerエラーをスロー。アウトライヤーをキャプチャするには、分割ポイントをDouble.NegativeInfinityまたはDouble.PositiveInfinityと定義することは可能です。

Bucketizerは、適切なバケットをバイナリ検索することで、任意の分割で効率的に動作するように設計されています。あなたのような定期的なビンの場合、1は、単にような何かを行うことができます。bin_minbin_widthが最小ビンの左側の間隔をそれぞれビン幅です

val binned = df.withColumn("bucket", (($"value" - bin_min)/bin_width).cast("int")) 

+0

しかし、バケットが空であればグループ化は結果も返しませんので、すべてのバケット(そしてカウントが0の空のもの)のリストを見たい場合、これは結合なしで実行できます? –

+0

ビニング後の範囲で結合を実行するのはかなり効率的です。 –