2016-02-29 7 views
20

この質問は新しいものではありませんが、私はSparkで驚くべき動作を見出しています。行IDの列をDataFrameに追加する必要があります。私はDataFrameメソッドmonotonically_increasing_id()を使用して、それは私にユニーク行ID(それは連続していないが、一意である)の追加の列を与えます。Spark DataFrameに行IDの永続列を追加するにはどうすればよいですか?

私が抱えている問題は、DataFrameをフィルタリングすると、結果のDataFrameの行IDが再割り当てされるということです。 2つのDataFramesを以下に示します。

  • 最初のものは、以下のように行IDを持つ最初のデータフレームが追加された:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
  • 第DATAFRAMEはdf.filter(col("P"))介しCOL Pにフィルタリングした後に得られたものです。

問題は、初期データフレームに5たCUSTID 169のためのROWID、によって示されるが、CUSTID 169を除外した場合、ROWID(5)custmId 773に再割り当てされたことフィルタリング後です!なぜこれがデフォルトの動作であるのかわかりません。

私はrowIdsを「粘着性」にしたいと思います。私がDataFrameから行を削除した場合、IDを再利用したくないので、行を削除しすぎてしまいます。それは可能ですか? monotonically_increasing_idメソッドからこの動作を要求するフラグはありません。

+---------+--------------------+-------+ 
| custId | features| P |rowId| 
+---------+--------------------+-------+ 
|806  |[50,5074,...| true| 0| 
|832  |[45,120,1...| true| 1| 
|216  |[6691,272...| true| 2| 
|926  |[120,1788...| true| 3| 
|875  |[54,120,1...| true| 4| 
|169  |[19406,21...| false| 5| 

after filtering on P: 
+---------+--------------------+-------+ 
| custId| features| P |rowId| 
+---------+--------------------+-------+ 
|  806|[50,5074,...| true| 0| 
|  832|[45,120,1...| true| 1| 
|  216|[6691,272...| true| 2| 
|  926|[120,1788...| true| 3| 
|  875|[54,120,1...| true| 4| 
|  773|[3136,317...| true| 5| 
+1

2つのサンプルデータフレームを生成するための完全なコードを共有できますか?それは価値があるため、「独立した」マップステージを再配置できるSQLクエリの最適化による可能性があります。 –

+0

ハメル、実際に私が投稿したもの以外の変形や行動はありません。表示されるデータフレームは、df.show()の結果です。この動作を非常に簡単に再現し、データフレームを作成し、上記のように行ID列を追加して、それにランダムなブール列を追加することができます。次に、その列をフィルタリングして、単調に増加していく行IDが、私が記述しているように「再利用」されている様子を見てください。 – Kai

+0

@カイ私はそれを再現する最も簡単な方法は、単一のパーティションだけを使うことだと、実際に付け加えます。 – zero323

答えて

11

スパーク2.0

  • これは、問題はSPARK-14241とスパーク2.0で解決されています。

  • 別の同様の問題は、あなたの経験はかなり微妙ですが、単純な事実monotonically_increasing_idに減少させることができる

スパーク1.1

問題があるSPARK-14393とスパーク2.1で解決されました非常に醜い機能。それは明らかに純粋ではなく、その価値はあなたのコントロールから完全に外れているものに依存します。

パラメータを使用しないため、オプティマイザの観点からは、それが呼び出されたときは問題ではなく、他のすべての操作の後にプッシュできます。したがって、あなたが見る行動。

コードをご覧になると、これはの式をNondeterministicに拡張することによって明示的にマークされます。

私には優雅な解決策はありませんが、これを処理できる方法の1つはフィルタリングされた値に人為的な依存関係を追加することです。例えば、このようなUDFと:

from pyspark.sql.types import LongType 
from pyspark.sql.functions import udf 

bound = udf(lambda _, v: v, LongType()) 

(df 
    .withColumn("rn", monotonically_increasing_id()) 
    # Due to nondeterministic behavior it has to be a separate step 
    .withColumn("rn", bound("P", "rn")) 
    .where("P")) 

一般に、RDDzipWithIndexを使用してインデックスを追加しDataFrameに戻し変換する掃除機であってもよいです。


は、*上記に示した解決策は、もはやのPythonのUDFは、実行計画の最適化の対象であるスパーク2.xで有効溶液(も必須)ではありません。

3

これを再現できませんでした。私はSpark 2.0を使用していますが、おそらく動作が変更されているか、私はあなたと同じことをやっていません。

val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true)) 
.toDF("name", "value","flag") 
.withColumn("rowd", monotonically_increasing_id()) 

df.show 

val df2 = df.filter(col("flag")=== true) 

df2.show 

df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields] 
+-----+-----+-----+----+ 
| name|value| flag|rowd| 
+-----+-----+-----+----+ 
| one| 1| true| 0| 
| two| 2|false| 1| 
|three| 3| true| 2| 
| four| 4| true| 3| 
+-----+-----+-----+----+ 
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields] 
+-----+-----+----+----+ 
| name|value|flag|rowd| 
+-----+-----+----+----+ 
| one| 1|true| 0| 
|three| 3|true| 2| 
| four| 4|true| 3| 
+-----+-----+----+----+ 
+0

上記のコードで問題が見つかりませんでした – thebluephantom

+0

** Javaの** monotonically_increasing_id()**に相当するもの – Yugerten

+0

org.apache.spark.sql.functionsパッケージはJava API https:// sparkで利用できます。 apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#monotonicallyIncreasingId-- – Davos

1

monotonically_increasing_id(のシフト評価を回避するために)、あなたはディスク、および再読み込みにデータフレームを書いてみてください。次に、id列は、パイプラインのある時点で動的に計算されるのではなく、単に読み取られているデータフィールドになりました。それはかなり醜い解決策ですが、私は迅速なテストを行ったときに働きました。

1

これは私のために働いた。別のID列を作成し、ウィンドウ関数row_numberを使用しました

import org.apache.spark.sql.functions.{row_number} 
import org.apache.spark.sql.expressions.Window 

val df1: DataFrame = df.withColumn("Id",lit(1)) 

df1 
.select(
..., 
row_number() 
.over(Window 
.partitionBy("Id" 
.orderBy(col("...").desc)) 
) 
.alias("Row_Nbr") 
) 
関連する問題