2016-11-28 13 views
3

Apache Sparkには、現在処理中のファイル名を持つ新しい列をDatasetに追加するために使用されるinput_file_name関数があります。Spark SQLのパスからファイル名のみを抽出するUDF

問題は、この関数をファイル名だけを返すようにカスタマイズして、s3のフルパスを省略することです。

今、私はマップ機能を使用して、第2段階上のパスの交換を行っています場合:

val initialDs = spark.sqlContext.read 
.option("dateFormat", conf.dateFormat) 
.schema(conf.schema) 
.csv(conf.path).withColumn("input_file_name", input_file_name) 
... 
... 
def fromFile(fileName: String): String = { 
    val baseName: String = FilenameUtils.getBaseName(fileName) 
    val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;) 
    this.valueOf(tmpFileName) 
} 

しかし、私はScalaで

val initialDs = spark.sqlContext.read 
    .option("dateFormat", conf.dateFormat) 
    .schema(conf.schema) 
    .csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**) 
+0

'.withColumn( "input_file_name"、get_only_file_name(input_file_name))'。ここで 'get_only_file_name'はudfです。 – mrsrinivas

答えて

5

のようなものを使用したい:

#register udf 
spark.udf 
    .register("get_only_file_name", (fullPath: String) => fullPath.split("/").last) 

#use the udf to get last token(filename) in full path 
val initialDs = spark.read 
    .option("dateFormat", conf.dateFormat) 
    .schema(conf.schema) 
    .csv(conf.path) 
    .withColumn("input_file_name", get_only_file_name(input_file_name)) 

編集:In Ja VAコメントどおり

#register udf 
spark.udf() 
    .register("get_only_file_name", (String fullPath) -> { 
    int lastIndex = fullPath.lastIndexOf("/"); 
    return fullPath.substring(lastIndex, fullPath.length - 1); 
    }, DataTypes.StringType); 

import org.apache.spark.sql.functions.input_file_name  

#use the udf to get last token(filename) in full path 
Dataset<Row> initialDs = spark.read() 
    .option("dateFormat", conf.dateFormat) 
    .schema(conf.schema) 
    .csv(conf.path) 
    .withColumn("input_file_name", get_only_file_name(input_file_name())); 
+1

ありがとう、それはトリックでした! – cingulata

+0

@ Anandj.Kadhi:私はそれが非常に遅く応答したことを知っています、Plsは一度更新をチェックします。 – mrsrinivas

関連する問題