2016-05-16 23 views
1

私は、Spark StreamingがfileStream()メソッドでピックアップする各ファイルの名前を見つける方法を探していました。 Javaには部分的な解決策がいくつかありますが、Scalaの例は見つかりませんでした。 FileInputFormatを使用して、完全ではない推奨事項もありますが、これは使用方法が明確ではありません。 Scalaのサンプルコードは大変ありがとうございます。スカラでfileStream()を使用してSpark Streamingでファイル名の名前を調べる方法は?

答えて

3

これは私が同様の質問に対する回答の数を見つけると組み合わせることで、これを解決する方法である:

def fileNameFilter(path: Path): Boolean = { 
    if (path.getName().contains("COPYING")) { 
     logger.info("*** ignoring incomplete file: " + path.getName()) 
     return false 
    } else { 
     return true 
    } 
} 

def deleteFile(sc: SparkContext, fileName: String): Unit = { 
    val filePath = new Path(fileName) 
    val fs = FileSystem.get(new Configuration()) 
    if (fs.isDirectory(filePath)) { 
     fs.listStatus(filePath).foreach((status) => { 
      fs.delete(status.getPath(), true) 
     }) 
    } else { 
     fs.delete(filePath, true) 
    } 
} 

val ssc = new StreamingContext(sc, Seconds(5)) 
val mfStream = ssc.fileStream[LongWritable,Text,TextInputFormat](pathToMyFiles, x=>fileNameFilter(x), true) 
mfStream.foreachRDD(rdd => { 
....some business logic 
if (!rdd.partitions.isEmpty) { 
    regExp.findAllMatchIn(rdd.toDebugString).foreach(name => { 
    logger.info("Deleting processed File(s): " + name.toString) 
    deleteFile(sc, name.toString) 
}) 
} 

}) 

が、これは同様のニーズに他人を助けることを願っています...

関連する問題