spark sqlの場合、HDFSの1つのフォルダからデータをフェッチし、いくつかの変更を加え、更新されたデータを同じフォルダHDFSでを介して上書き保存モード FileNotFoundExceptionを取得せずに?Spark SQL SaveMode.Overwrite、java.io.FileNotFoundExceptionを取得し、 'REFRESH TABLE tableName'を要求する
import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf
val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works
にFileNotFoundExceptionが、我々はHDFSディレクトリからデータを読み取る際に、 "2017年3月20日= D" が起こる、と保存(SaveMode.Overwrite)DIR同じHDFSへの更新データ "D = 2017年3月20日"
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
... 8 more
次の試行でも同じエラーが発生しますが、spark sqlを使用してこの問題を解決するにはどうすればよいですか?ありがとうございました!
val hdfsDirPath = "hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20"
val df= sparkSession.read.parquet(hdfsDirPath)
val newdf = df
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
または
val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")
sparkSession.sql("TRUNCATE TABLE orgtable")
sparkSession.sql("INSERT INTO orgtable SELECT * FROM tmptable")
val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
または
val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")
sparkSession.sql("REFRESH TABLE orgtable")
sparkSession.sql("ALTER VIEW tmptable RENAME TO orgtable")
val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
私はあなたが私たちはそれを読んでいるところからファイルへのスパークデータフレームを書き込むことができない、他のディレクトリに保存してから元のディレクトリに –
をコピーする必要が行うことができると思ういけません。それでもやりたいのであれば、まず一時ディレクトリにDFを書いてから、 'SaveMode.Overwrite'でディレクトリを書いてください。 – himanshuIIITian
AkashとhimanshuIIITian、HDFSの一時ディレクトリにDFを書くことは良い解決策ですが、この問題を解決するためにSpark SQLを使用できる方法があるのだろうかと思っていましたか? HDFSからのデータの書き込みと取り出しは、メモリ内のSpark SQLを使用するよりも時間と空間の効率が低いため、問題を解決するためにREFRESH、TRUNCATE、またはDROPテーブルを使用できますか? – faustineinsun