2016-07-20 5 views
15

sparkではなく特定のパーティションを上書きしたい。私は次のコマンドを試しています:sparkデータフレームの書き込み方法で特定のパーティションを上書きする

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4') 

ここで、dfは上書きされるインクリメンタルデータを持つデータフレームです。

hdfs-base-pathにはマスターデータが含まれています。

上記のコマンドを実行すると、すべてのパーティションが削除され、hdfsパスにdf内のパーティションが挿入されます。

私の要件は、指定されたhdfsパスでdfに存在するパーティションだけを上書きすることです。誰かがこれで私を助けてくれますか?

答えて

13

これは一般的な問題です。 2.0までのスパークを持つ唯一の解決策は、彼らがするので、(あなたが2.0より前にスパークを使用している場合は、メタデータファイルを発光からスパークを停止する必要があります、例えば、パーティションのディレクトリに直接

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value") 

を書くことです使用して)自動パーティションの発見を破る:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") 

あなたが前に1.6.2へのスパークを使用している場合は、自動パーティション発見を中断します/root/path/to/data/partition_col=valueまたはその存在に_SUCCESSファイルを削除する必要があります。 (私は強く1.6.2以降を使用することをお勧めします)

大きなパーティションテーブルを管理する方法については、Spark Summitのトーク​​でさらに詳しく知ることができます。

+0

おかげでたくさんのシムを。最初のデータフレームに約100個のパーティションのデータがあると仮定すると、このデータフレームを別の100個のデータフレームに分割してそれぞれのパーティションの値で分割してパーティションディレクトリに直接挿入する必要があります。これらの100個のパーティションを同時に保存することはできますか?また、Spark 1.6.1を使用しています。orcファイル形式を使用している場合、どのようにメタデータファイルの出力を停止できますか? – yatin

+0

Re:メタデータ、いいえ、ORCは別の形式ですが、データ以外のファイルを生成しないと思います。 1.6.1では、パーティションツリーのサブディレクトリにORCファイルのみが必要です。したがって、 '_SUCCESS'を手で削除する必要があります。複数のパーティションに並列に書き込むことはできますが、同じジョブから書き込むことはできません。プラットフォーム機能に基づいて複数のジョブを開始します(REST APIを使用)。 – Sim

+3

これについての最新情報はありますか? saveToTable()は特定のパーティションだけを上書きしますか?どのパーティションが上書きされたかを知るのに十分なほどスマートな印象を与えますか? –

4

Spark 1.6を使用...

HiveContextはこのプロセスを大幅に簡素化できます。重要な点は、最初にパーティションを定義したCREATE EXTERNAL TABLEステートメントを使用してHiveでテーブルを作成する必要があることです。たとえば、

# Hive SQL 
CREATE EXTERNAL TABLE test 
(name STRING) 
PARTITIONED BY 
(age INT) 
STORED AS PARQUET 
LOCATION 'hdfs:///tmp/tables/test' 

ここから、特定のパーティション(または複数のパーティション)の新しいレコードを含むDataframeがあるとします。あなたは、データフレームに含まれているパーティションのみのテーブルを上書きします。このデータフレームを使用して、INSERT OVERWRITEを実行するためにHiveContext SQLステートメントを使用することができます。

# PySpark 
hiveContext = HiveContext(sc) 
update_dataframe.registerTempTable('update_dataframe') 

hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age) 
        SELECT name, age 
        FROM update_dataframe""") 

注:この例ではupdate_dataframeはのことを一致するスキーマを持っていますターゲットtestテーブル。

このアプローチで簡単に間違えるのは、HiveのCREATE EXTERNAL TABLEステップをスキップし、Dataframe APIの書き込みメソッドを使用してテーブルを作成することです。特にParquetベースのテーブルの場合、テーブルはHiveのINSERT OVERWRITE... PARTITION関数をサポートするために適切に定義されません。

これが役に立ちます。

+0

私は上記のアプローチを試しました、私は '動的パーティション厳密モードのようなエラーが発生している少なくとも1つの静的パーティションの列が必要です。この設定をオフにするにはhive.exec.dynamic.partition.mode = nonstrict' – Shankar

+0

私は静的パーティションの列を持っていません – Shankar

0

DataFrameを使用している場合は、データ上にHiveテーブルを使用することもできます。あなたが必要とする。この場合 はちょうどそれは、データフレームが含まれているパーティションを上書きします

df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name) 

メソッドを呼び出します。

SparkはHiveテーブル形式を使用するため、フォーマット(orc)を指定する必要はありません。

それはあなたが仕事のリエントラント(冪等)を作るために、このような何かを行うことができます1.6

0

スパークバージョンで正常に動作します:答えるため (スパーク2.2でこれを試してみました)

# drop the partition 
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition) 
print drop_query 
spark.sql(drop_query) 

# delete directory 
dbutils.fs.rm(<partition_directoy>,recurse=True) 

# Load the partition 
df.write\ 
    .partitionBy("partition_col")\ 
    .saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>) 
関連する問題