2016-11-25 1 views
1

私はJSONファイルからデータを読み込み、それがファイルを寄木細工への書き込みにスパークジョブを書いていますが存在する場合(寄木細工のファイルで)レコードを更新しますが、以下の例のコードです:スパークすでに

DataFrame dataFrame = new DataFrameReader(sqlContext).json(textFile); 
    dataFrame = dataFrame.withColumn("year", year(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp")))); 
    dataFrame = dataFrame.withColumn("month", month(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp")))); 
    dataFrame.write().mode(SaveMode.Append).partitionBy("year", "month").parquet("<some_path>"); 

JSONファイルはたくさんのjsonレコードで構成されています。レコードが既に存在する場合は、そのレコードを寄木張りで更新します。 Appendモードを試しましたが、レコードレベルではなくファイルレベルで動作しているようです(つまり、ファイルがすでに存在する場合は、最後に書き込みます)。したがって、同じファイルに対してこのジョブを実行すると、レコードが複製されます。

データフレームの行IDを一意のキーとして指定する方法はありますか?それが既に存在する場合、レコードを更新するようsparkに依頼してください。すべての保存モードは、ファイルではなくレコードをチェックしているようです。

答えて

0

parquetはIDで更新するためにファイルを読み込み、新しいファイルにデータを書き換えるよりもメモリ内の値を更新する必要があります。既存のファイル)。

頻繁に使用されるユースケースの場合は、データベースを使用する方がよい場合があります。

0

あなたは、代わりにApacheのORCファイル形式を見て見ることができます:

https://orc.apache.org/docs/acid.html

あなたはHDFSのトップに滞在する場合はご利用の場合、またはHBaseのよります。

しかし、HDFSは書き込み可能なファイルシステムであり、必要がない場合は別のもの(おそらくelasticsearch、mongodb)を使用してください。

また、HDFSでは、毎回新しいファイルを作成する必要があります。差分ファイルを作成してOLD + DELTA = NEW_DATAをマージするには、増分プロセスを設定する必要があります。