これは私が同様の質問に対する回答の数を見つけると組み合わせることで、これを解決する方法である:
def fileNameFilter(path: Path): Boolean = {
if (path.getName().contains("COPYING")) {
logger.info("*** ignoring incomplete file: " + path.getName())
return false
} else {
return true
}
}
def deleteFile(sc: SparkContext, fileName: String): Unit = {
val filePath = new Path(fileName)
val fs = FileSystem.get(new Configuration())
if (fs.isDirectory(filePath)) {
fs.listStatus(filePath).foreach((status) => {
fs.delete(status.getPath(), true)
})
} else {
fs.delete(filePath, true)
}
}
val ssc = new StreamingContext(sc, Seconds(5))
val mfStream = ssc.fileStream[LongWritable,Text,TextInputFormat](pathToMyFiles, x=>fileNameFilter(x), true)
mfStream.foreachRDD(rdd => {
....some business logic
if (!rdd.partitions.isEmpty) {
regExp.findAllMatchIn(rdd.toDebugString).foreach(name => {
logger.info("Deleting processed File(s): " + name.toString)
deleteFile(sc, name.toString)
})
}
})
が、これは同様のニーズに他人を助けることを願っています...