2016-11-19 12 views
0

に各パーティションの内容をダンプし、私は次のようになり、そのデータフレームDF1にいくつかのデータをロードするために、スパーク1.6.2 Java APIを使用しています:特定の列に基づいてパーティションスパークDATAFRAMEおよびCSV

Key Value 
A v1 
A v2 
B v3 
A v4 

これで、 "Key"列の値のサブセットに基づいてDF1を分割し、各パーティションをcsvファイルに(spark-csvを使用して)ダンプする必要があります。

所望の出力:

A.csv

Key Value 
A v1 
A v2 
A v4 

B.csv

Key Value 
B v3 

瞬間に私がやっていることのサブセットを含むHashMapの(はmyList)を構築しています私はフィルタリングする必要があるし、各反復ごとに別のKeyをフィルタリングして反復する必要があります。あなただけのファイルを分割なる、partitionByを追加する必要があり、あなたがほとんど存在し

DF1 = <some operations>.cache(); 

for (Object filterKey: myList.keySet()) { 
    DF2 = DF1.filter((String)myList.get(filterKey)); 

    DF2.write().format.format("com.databricks.spark.csv") 
      .option("header", "true") 
     .save("/" + filterKey + ".csv"); 
} 

答えて

1

:次のコードで、私は私が望む結果を得るが、それを行うために、より効率的な方法がある場合、私は思ったんだけどあなたが望む方法で。

DF1 
    .filter{case(key, value) => myList.contains(key)) 
    .write 
    .partitionBy("key") 
    .format("com.databricks.spark.csv") 
    .option("header", "true") 
    .save("/my/basepath/") 

ファイルが今の下に保存されます "/私/はBasePath /キー= A /"、 "/私/はBasePath /キー= B /" など。

関連する問題