2017-07-19 27 views
0

スカラーのパスのSequenceにあるファイルを読み込もうとしています。以下はサンプル(疑似)コードです:スパーク:パスが存在する場合のみファイルを読み込みます。

ここで、上記のシーケンスでは、いくつかのパスが存在するのに対し、いくつかのパスは存在しません。 parquetファイルを読み取っている間に、欠落しているパスを無視する方法はありますか(org.apache.spark.sql.AnalysisException: Path does not existを避けるため)?

私は以下試してみました、それが動作しているようだが、その後、私は私がやって避けたいものです二度同じパスを読み終わる:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess) 

私はoptions方法を確認DataFrameReaderでも、それにはignore_if_missingに似たオプションはありません。

また、これらのパスはhdfsまたはs3ことができます(このSeqは、メソッドの引数として渡されます)と読んでいる間、私はパスがs3hdfsであるかどうかわからない程度にs3またはhdfs特定のAPIを使用することはできません存在を確認してください。例えば

paths.filter(f => new java.io.File(f).exists) 

Seq("/tmp", "xx").filter(f => new java.io.File(f).exists) 
// res18: List[String] = List(/tmp) 

答えて

1

。 sparkでは、そうする最善の方法は、内部スパークハーフープ設定を使用することです。スパークセッション変数が「スパーク」と呼ばれる場合、次の操作を実行できます。

import org.apache.hadoop.fs.FileSystem 
import org.apache.hadoop.fs.Path 

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) 

def testDirExist(path: String): Boolean = { 
    val p = new Path(path) 
    hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory 
} 
val filteredPaths = paths.filter(p => testDirExists(p)) 
val dataframe = spark.read.parquet(filteredPaths: _*) 
+0

'Paths'はローカルの' hdfs'パスまたは 's3'パスです。 'File.exists'が' s3'で動作するかどうかは不明です。 –

+1

パスがHDFS/S3パス(Sparkでよく使用されるパス)である場合、パスの存在を確認するために少し異なるAPIが必要になります。 [@DarshanMehtaあなたは3秒で私にそれを打つ:)] –

+0

@ TzachZoharハハはい。私は今質問を更新しました。 –

4

あなたは、@のPsidomの答えのように無関係なファイルを除外できpaths firstly`をフィルタリングについてどのように

+0

システム設定によっては、get: 'FileSystem.get(新しいURI(" s3:// bucket ")、spark.sparkContext.hadoopConfiguration)'でファイルシステムの場所を指定する必要があります。そうしないと、S3ファイルシステムのパスをチェックする際にHDFSファイルシステムとbarfが作成される可能性があります。 – Azuaron

0

多分このようなものがあなたに役立つでしょうか?

def read(path: Seq[String]): Try[DataFrame] = Try(spark.read.parquet(p)) 


read("somePath") match { 
    case Success(df) => df.show() 
    case Failure(_) => Unit 
} 
関連する問題