私は、分割された寄木細工のファイルに書き込むときに、DataFrameの特定の行が削除される問題があります。ここで AWSのSpark 2.2.0 EMRによる寄木細工への書き込み
は私の手順は次のとおりです。-
指定されたスキーマとS3から
- 読むCSVデータファイル
- '日付' 列でパーティション(DATETYPE)
mode=append
と寄木として
date='2012-11-22'
の特定のパーティションでは、CSVファイル、DataFrameファイル、および寄木細工ファイルをカウントします。ここで
はpysparkを使用して再現するいくつかのコードです:
logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted
私はあまりにもHDFSへの書き込みをしようとしていると、結果は同じです。これは、複数のパーティションで発生します。デフォルト/ヌルパーティションにはレコードがありません。上記のlogs_df
は正確で正確です。
2番目の実験では、分割されていない寄木細工のファイルを書きました。
logs_df.write.parquet('s3://.../logs_2012_parquet/', mode='append')
この寄木細工のセットをロードし、上記のようにカウントを実行date='2012-11-22'
及び他の日付のための5000の正しい結果が得られた:上記のコードでの唯一の違いは、partitionBy()
の抜けがありました。モードをoverwrite
に設定するか、または設定しない(デフォルトを使用する)と同じデータが失われます。
私の環境は次のとおりです。
- EMR 5.9.0
- スパーク2.2.0
- のHadoopディストリビューション:EMRFS一貫したビューではなく、両方を試してみましたアマゾン2.7.3
- 。しかし、ほとんどのテストでは、S3の一貫性の問題を避けるためにHDFSへの書き込みが行われていました。
Sparkを使用して、修正や回避策、または寄木細工のファイルに変換する方法をお勧めします。
おかげで、
編集:私は第二の実験を再現することができませんでした。つまり、ParquetまたはJSONに書き込むときに、パーティション化されたパーティションとパーティション化されていないパーティションの両方でレコードが削除されるようです。
いつも同じレコードがありますか?データフレーム内のすべてのレコードに対して日付が明確に定義されていますか? –
はい、それは同じものです。日付の形式が正しくありません。 DataFrameは 'filter()'を使ってレコードを識別します。それで、それが間違っているpartitionByを使用するときは、常に書き込みステップにあります。 – AtharvaI
hdfsやs3への書き込み時に特定の行がドロップされる理由はありますか?私はSTRINGとしてもこの列を試しました。理由を理解できません。 – AtharvaI