2016-12-15 4 views
0

あなたはこのようになります簡単なデータフレームがある場合:シンプルなロールダウンスパークデータフレーム付き(スカラ)

val n = sc.parallelize(List[String](
    "Alice", null, null, 
    "Bob", null, null, 
    "Chuck" 
    )).toDF("name") 

を次のようになります。

//+-----+ 
//| name| 
//+-----+ 
//|Alice| 
//| null| 
//| null| 
//| Bob| 
//| null| 
//| null| 
//|Chuck| 
//+-----+ 

がどのようにデータフレームを使用することができますROLL-ダウン機能を得るために:

//+-----+ 
//| name| 
//+-----+ 
//|Alice| 
//|Alice| 
//|Alice| 
//| Bob| 
//| Bob| 
//| Bob| 
//|Chuck| 
//+-----+ 

注:すべての必要な輸入を明記してください、私はこれらの疑いが含まれます:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.{WindowSpec, Window} 

注:私は模倣しようとしたいくつかのサイトは以下のとおりです。

http://xinhstechblog.blogspot.com/2016/04/spark-window-functions-for-dataframes.html

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

私が気付いて、私は過去にこのようなものに遭遇しましたSparkのバージョンは異なるでしょう。私はクラスタで1.5.2を使用しています(このソリューションがより便利です)。ローカルエミュレーションでは2.0です。私は1.5.2互換のソリューションを好む。

また、私は離れて直接SQLを書いてから取得したいのですが - あなたは値のグループ化を可能にする別の列を持っている場合は、ここで提案ですsqlContext.sql(...)

+0

ヌル値の配置を許可する列がありますか?あなたが指定した例では、DataFramesが配布されているため、順序が決定的になることはありません。したがって、コードを実行するたびにnull値が別の場所に表示されます。いくつかの順序付け(またはグループ化)を定義できる別の列がある場合は、解決策がそれほど難しくありません。 –

答えて

1

を使用しないよう:オン

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.Window 
import sqlContext.implicits._ 

val df = Seq(
    (Some("Alice"), 1), 
    (None, 1), 
    (None, 1), 
    (Some("Bob"), 2), 
    (None, 2), 
    (None, 2), 
    (Some("Chuck"), 3) 
).toDF("name", "group") 

val result = df.withColumn("new_col", min(col("name")).over(Window.partitionBy("group"))) 

result.show() 

+-----+-----+-------+ 
| name|group|new_col| 
+-----+-----+-------+ 
|Alice| 1| Alice| 
| null| 1| Alice| 
| null| 1| Alice| 
| Bob| 2| Bob| 
| null| 2| Bob| 
| null| 2| Bob| 
|Chuck| 3| Chuck| 
+-----+-----+-------+ 

一方で、順序付けを許可するがグループ化しない列がある場合、その解決策は少し難しくなります。私の最初のアイデアはサブセットを作成してから参加することです:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.Window 
import sqlContext.implicits._ 

val df = Seq(
    (Some("Alice"), 1), 
    (None, 2), 
    (None, 3), 
    (Some("Bob"), 4), 
    (None, 5), 
    (None, 6), 
    (Some("Chuck"), 7) 
).toDF("name", "order") 

val subset = df 
    .select("name", "order") 
    .where(col("name").isNotNull) 
    .withColumn("next", lead("order", 1).over(Window.orderBy("order"))) 

val partial = df.as("a") 
    .join(subset.as("b"), col("a.order") >= col("b.order") && (col("a.order") < subset("next")), "left") 
val result = partial.select(coalesce(col("a.name"), col("b.name")).as("name"), col("a.order")) 

result.show() 

+-----+-----+ 
| name|order| 
+-----+-----+ 
|Alice| 1| 
|Alice| 2| 
|Alice| 3| 
| Bob| 4| 
| Bob| 5| 
| Bob| 6| 
|Chuck| 7| 
+-----+-----+ 
+0

ダニエルありがとう。残念ながら、データはこの例で提供したデータよりも広く、最初の選択肢をあまり望ましくないようにするグループ化はありません。単調に増加するカウンタ(列)を追加することで2番目のオプションが簡単に実行できるように見えますが、2番目のオプションは、SparkとScalaで予想されるよりはるかに複雑なソリューションです。 – codeaperature

+0

@codeaperature 2番目の解決法は単純ではなく、おそらくもっと良い方法があると私は同意しますが、問題に隣接する依存関係が含まれている場合、通常Sparkが最適ではないと言うことができますまたは従来の反復ロジックを使用しているため、問題の単純な解決策がない可能性があります。 –

+0

サイドバー:最後の項目(name)がnullまたはNoneの場合、これに小さなバグがあります。 – codeaperature

関連する問題