2017-09-11 12 views
0

スパークデータフレームを2つに分割し、それぞれのサブデータフレームの行番号を定義します。しかし、関数monotonically_increasing_idは元のデータフレームから行番号を定義していることがわかりました。ここで Pysparkling 2 reset monotonically_increasing_id from 1

は、私はPythonでやったことです:

# df is the original sparkframe 
splits = df.randomSplit([7.0,3.0],400) 

# add column rowid for the two subframes 
set1 = splits[0].withColumn("rowid", monotonically_increasing_id()) 
set2 = splits[1].withColumn("rowid", monotonically_increasing_id()) 

# check the results 
set1.select("rowid").show() 
set2.select("rowid").show() 

私は両方とも1〜5(または0〜4、はっきり覚えていない)二つのフレームのためのROWIDの最初の5つの要素を期待します:

set1: 1 2 3 4 5 
set2: 1 2 3 4 5 

しかし、私は実際に得たことは次のとおりです。

set1: 1 3 4 7 9 
set2: 2 5 6 8 10 

2つのサブフレーム行IDは中に実際に彼らの行IDですオリジナルのスパークフレームは新しいものではありません。

スパークの新生児として、私はこれがなぜ起こったのか、それをどう修正するのかについての助けを求めています。

答えて

0

まず最初に、どのバージョンのSparkを使用していますか? monotonically_increasing_idメソッドの実装が数回変更されました。私はSpark 2.0であなたの問題を再現することができますが、動作はスパーク2.2で異なっているようです。だから、新しいスパークリリースで修正されたバグかもしれません。言われていること

、あなたはmonotonically_increasing_idによって生成された値が連続を高めることを期待しない必要があります。あなたのコードでは、データフレームのパーティションが1つしかないと私は考えています。 http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html

によると、生成されたIDが単調に、なく連続で増加し、ユニークな されることが保証されます。 現在の実装では、 パーティションIDが上位31ビットに格納され、各レコード内のレコード番号が下位33ビットの パーティションに格納されます。データフレーム には10億未満のパーティションがあり、各パーティションには8個未満のレコードがあると仮定します。 億レコード。

例として、3つのパーティションがあり、それぞれが3つの レコードを持つDataFrameを考えてみましょう。この式は、次のIDを返します:0、1、2、 8589934592(1L < < 33)、8589934593、8589934594.

をだからあなたのコードは、ROWIDが連続して増加することを期待するべきではない場合。

また、キャッシュシナリオと障害シナリオも考慮する必要があります。 monotonically_increase_idが期待どおりに動作しても、連続して値を増やしても、それは動作しません。ノードに障害が発生したらどうなりますか?障害の発生したノード上のパーティションは、ソースまたは最後のキャッシュ/チェックポイントから再生成されます。キャッシュからの逸脱もまた問題を引き起こす。データフレームを生成した後、それをメモリにキャッシュすると仮定します。メモリから追い出された場合はどうなりますか?将来のアクションは、再びデータフレームを再生成しようとし、異なるROWIDを与えます。

+0

ありがとうございます。私はspark 2.0を使用しています。あなたはROWIDのラベルをつけることについて何か提案がありますか? – TooYoung

+0

私はすべてのシナリオで動作するソリューションを考えることはできません。 monotonically_increase_idが期待どおりに動作すると仮定すると、連続的に増加しますが、それでもあなたのためには機能しません。どうして?ノードに障害が発生したらどうなりますか?障害の発生したノード上のパーティションは、ソースまたは最後のキャッシュ/チェックポイントから再生成されます。障害シナリオはほとんどありません。しかしもう一つのシナリオはより一般的です。データフレームを生成した後、それをメモリにキャッシュすると仮定します。メモリから追い出された場合はどうなりますか?今後、データフレームを再生成しようとします。 – Lan