私は、Spark 2.1を使用してHDFS 2.7にデータを書き込む方法をテストしようとしています。私のデータはダミー値のシンプルなシーケンスで、出力は属性:idとのキーで分割する必要があります。Spark 2.1で分割された寄木細工ファイルを保存する方法は?
私はHDFSに次のツリー構造を取得するために期待してい// Simple case class to cast the data
case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)
// Actual data to be stored
val testData = Seq(
SimpleTest("test", 12, 13.5.toFloat, 1),
SimpleTest("test", 12, 13.5.toFloat, 2),
SimpleTest("test", 12, 13.5.toFloat, 3),
SimpleTest("simple", 12, 13.5.toFloat, 1),
SimpleTest("simple", 12, 13.5.toFloat, 2),
SimpleTest("simple", 12, 13.5.toFloat, 3)
)
// Spark's workflow to distribute, partition and store
// sc and sql are the SparkContext and SparkSession, respectively
val testDataP = sc.parallelize(testData, 6)
val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
testDf.write.partitionBy("id", "key").parquet("/path/to/file")
:
- /path/to/file
|- /id=test/key=1/part-01.parquet
|- /id=test/key=2/part-02.parquet
|- /id=test/key=3/part-03.parquet
|- /id=simple/key=1/part-04.parquet
|- /id=simple/key=2/part-05.parquet
|- /id=simple/key=3/part-06.parquet
をしかし、私は前のコードを実行したときに、私は次の出力を得る:
/path/to/file/id=/key=24/
|-/part-01.parquet
|-/part-02.parquet
|-/part-03.parquet
|-/part-04.parquet
|-/part-05.parquet
|-/part-06.parquet
を私は知りませんコードに何か問題がある場合、またはSparkがやっていることが他にある場合。
次のように私はを実行しています:
--executor-コアローカル--driverメモリ30G --executorメモリ30G --master --name APPを火花提出8 --num -executors 8 --conf spark.io.compression.codec = lzf --conf spark.akka.frameSize = 1024 --conf spark.driver.maxResultSize = 1g --conf spark.sql.orc.compression.codec =非圧縮 - -conf spark.sql.parquet.filterPushdown = MyClassのmyFatJar.jar