Apache Sparkには、現在処理中のファイル名を持つ新しい列をDatasetに追加するために使用されるinput_file_name関数があります。Spark SQLのパスからファイル名のみを抽出するUDF
問題は、この関数をファイル名だけを返すようにカスタマイズして、s3のフルパスを省略することです。
今、私はマップ機能を使用して、第2段階上のパスの交換を行っています場合:
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}
しかし、私はScalaで
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)
'.withColumn( "input_file_name"、get_only_file_name(input_file_name))'。ここで 'get_only_file_name'はudfです。 – mrsrinivas