2017-05-02 5 views
4

私は、Spark 2.1を使用してHDFS 2.7にデータを書き込む方法をテストしようとしています。私のデータはダミー値のシンプルなシーケンスで、出力は属性:idのキーで分割する必要があります。Spark 2.1で分割された寄木細工ファイルを保存する方法は?

私はHDFSに次のツリー構造を取得するために期待してい
// Simple case class to cast the data 
case class SimpleTest(id:String, value1:Int, value2:Float, key:Int) 

// Actual data to be stored 
val testData = Seq(
    SimpleTest("test", 12, 13.5.toFloat, 1), 
    SimpleTest("test", 12, 13.5.toFloat, 2), 
    SimpleTest("test", 12, 13.5.toFloat, 3), 
    SimpleTest("simple", 12, 13.5.toFloat, 1), 
    SimpleTest("simple", 12, 13.5.toFloat, 2), 
    SimpleTest("simple", 12, 13.5.toFloat, 3) 
) 

// Spark's workflow to distribute, partition and store 
// sc and sql are the SparkContext and SparkSession, respectively 
val testDataP = sc.parallelize(testData, 6) 
val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key") 
testDf.write.partitionBy("id", "key").parquet("/path/to/file") 

- /path/to/file 
    |- /id=test/key=1/part-01.parquet 
    |- /id=test/key=2/part-02.parquet 
    |- /id=test/key=3/part-03.parquet 
    |- /id=simple/key=1/part-04.parquet 
    |- /id=simple/key=2/part-05.parquet 
    |- /id=simple/key=3/part-06.parquet 

をしかし、私は前のコードを実行したときに、私は次の出力を得る:

/path/to/file/id=/key=24/ 
|-/part-01.parquet 
|-/part-02.parquet 
|-/part-03.parquet 
|-/part-04.parquet 
|-/part-05.parquet 
|-/part-06.parquet 

を私は知りませんコードに何か問題がある場合、またはSparkがやっていることが他にある場合。

次のように私は​​を実行しています:

--executor-コアローカル--driverメモリ30G --executorメモリ30G --master --name APPを火花提出8 --num -executors 8 --conf spark.io.compression.codec = lzf --conf spark.akka.frameSize = 1024 --conf spark.driver.maxResultSize = 1g --conf spark.sql.orc.compression.codec =非圧縮 - -conf spark.sql.parquet.filterPushdown = MyClassのmyFatJar.jar

答えて

1

私は解決策を見つけました! Cloudera氏によると、mapred-site.xml設定に問題があります(下記のリンクを確認してください)。また、データフレームをtestDf.write.partitionBy("id", "key").parquet("/path/to/file")

として書き込む代わりに、次のようにしました:testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file")<namenode><port>をそれぞれHDFSのmasternode名とポートで置き換えることができます。

@ jacek-laskowskiに感謝します。

参考文献:

https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090

Writing to HDFS in Spark/Scala

5

興味深い--class真も..."それは私の作品" ...以来。

SimpleTestケースクラスを使用してSpark 2.1でデータセットを記述すると、タイプDatasetimport spark.implicits._になります。

私の場合、sparksqlです。

つまり、testDataPtestDfsql.createDataFrameを使用)を作成する必要はありません。 (/tmp/testDfディレクトリに保存した後)別の端末で

import spark.implicits._ 
... 
val testDf = testData.toDS 
testDf.write.partitionBy("id", "key").parquet("/path/to/file") 

$ tree /tmp/testDf/ 
/tmp/testDf/ 
├── _SUCCESS 
├── id=simple 
│   ├── key=1 
│   │   └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
│   ├── key=2 
│   │   └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
│   └── key=3 
│    └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
└── id=test 
    ├── key=1 
    │   └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
    ├── key=2 
    │   └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 
    └── key=3 
     └── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet 

8 directories, 7 files