2017-03-21 2 views
3

spark sqlの場合、HDFSの1つのフォルダからデータをフェッチし、いくつかの変更を加え、更新されたデータを同じフォルダHDFSでを介して上書き保存モード FileNotFoundExceptionを取得せずに?Spark SQL SaveMode.Overwrite、java.io.FileNotFoundExceptionを取得し、 'REFRESH TABLE tableName'を要求する

import org.apache.spark.sql.{SparkSession,SaveMode} 
import org.apache.spark.SparkConf 

val sparkConf: SparkConf = new SparkConf() 
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate() 
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") 
val newDF = df.select("a","b","c") 

newDF.write.mode(SaveMode.Overwrite) 
    .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work 
newDF.write.mode(SaveMode.Overwrite) 
    .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works 

にFileNotFoundExceptionが、我々はHDFSディレクトリからデータを読み取る際に、 "2017年3月20日= D" が起こる、と保存(SaveMode.Overwrite)DIR同じHDFSへの更新データ "D = 2017年3月20日"

Caused by: org.apache.spark.SparkException: Task failed while writing rows 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet 
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193) 
    ... 8 more 

次の試行でも同じエラーが発生しますが、spark sqlを使用してこの問題を解決するにはどうすればよいですか?ありがとうございました!

val hdfsDirPath = "hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20" 

val df= sparkSession.read.parquet(hdfsDirPath) 

val newdf = df 
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath) 

または

val df= sparkSession.read.parquet(hdfsDirPath) 
df.createOrReplaceTempView("orgtable") 
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable") 

sparkSession.sql("TRUNCATE TABLE orgtable") 
sparkSession.sql("INSERT INTO orgtable SELECT * FROM tmptable") 

val newdf = sparkSession.sql("SELECT * FROM orgtable") 
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath) 

または

val df= sparkSession.read.parquet(hdfsDirPath) 
df.createOrReplaceTempView("orgtable") 
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable") 

sparkSession.sql("REFRESH TABLE orgtable") 
sparkSession.sql("ALTER VIEW tmptable RENAME TO orgtable") 

val newdf = sparkSession.sql("SELECT * FROM orgtable") 
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath) 
+1

私はあなたが私たちはそれを読んでいるところからファイルへのスパークデータフレームを書き込むことができない、他のディレクトリに保存してから元のディレクトリに –

+1

をコピーする必要が行うことができると思ういけません。それでもやりたいのであれば、まず一時ディレクトリにDFを書いてから、 'SaveMode.Overwrite'でディレクトリを書いてください。 – himanshuIIITian

+0

AkashとhimanshuIIITian、HDFSの一時ディレクトリにDFを書くことは良い解決策ですが、この問題を解決するためにSpark SQLを使用できる方法があるのだろうかと思っていましたか? HDFSからのデータの書き込みと取り出しは、メモリ内のSpark SQLを使用するよりも時間と空間の効率が低いため、問題を解決するためにREFRESH、TRUNCATE、またはDROPテーブルを使用できますか? – faustineinsun

答えて

3

私はtempディレクトリに自分のデータフレームを書き、そして私が読んでソースを削除、およびに一時ディレクトリの名前を変更まず、これを解決ソース名。 QAQ

+0

ありがとうございます!問題を解決するためにREFRESH、TRUNCATE、またはDROPテーブルを使用できるかどうか疑問に思っていましたか?メモリ内での操作はディスク上の同じ作業よりも効率的です – faustineinsun

2

読んだ後にキャッシュしないでください。別のファイルディレクトリに保存してからディレクトリを移動すると、余分な権限が必要になることがあります。私もshow()のような行動を強制しています。

val myDF = spark.read.format("csv") 
    .option("header", "false") 
    .option("delimiter", ",") 
    .load("/directory/tofile/") 


myDF.cache() 
myDF.show(2) 
+0

'cache()'は 'persist(StorageLevel.MEMORY_ONLY)'のエイリアスです。これはクラスタメモリより大きいデータセットには適していません。 'persist(StorageLevel.MEMORY_AND_DISK_ONLY)'がより良い解決策になりますが、メモリが足りない場合はデータをローカルに保存します。私は前に 'persist'を使いましたが、うまくいかなかったようです。 – faustineinsun

+0

キャッシュの後にアクションを呼び出そうとしましたか? "上書き"はまだあなたの最初のアクションですか? –

関連する問題