私はJSONファイルからデータを読み込み、それがファイルを寄木細工への書き込みにスパークジョブを書いていますが存在する場合(寄木細工のファイルで)レコードを更新しますが、以下の例のコードです:スパークすでに
DataFrame dataFrame = new DataFrameReader(sqlContext).json(textFile);
dataFrame = dataFrame.withColumn("year", year(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
dataFrame = dataFrame.withColumn("month", month(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
dataFrame.write().mode(SaveMode.Append).partitionBy("year", "month").parquet("<some_path>");
JSONファイルはたくさんのjsonレコードで構成されています。レコードが既に存在する場合は、そのレコードを寄木張りで更新します。 Append
モードを試しましたが、レコードレベルではなくファイルレベルで動作しているようです(つまり、ファイルがすでに存在する場合は、最後に書き込みます)。したがって、同じファイルに対してこのジョブを実行すると、レコードが複製されます。
データフレームの行IDを一意のキーとして指定する方法はありますか?それが既に存在する場合、レコードを更新するようsparkに依頼してください。すべての保存モードは、ファイルではなくレコードをチェックしているようです。