0

登録後にcallUDFを使用してudfを呼び出そうとしています。ただし、関数validateNumber()は呼び出されません。callUDF()でUDFを呼び出せません - Spark Java

コードは以下のようになります。私がしようとしています

public Dataset<Row> sampleCallUdf(Dataset<Row> dataset) { 

    UDF2<Long, Long, String> validateNumber = (UDF2<Long, Long, String>) SampleClass::validateNumber; 
    UDFRegistration udfRegister = CONFIG.getSparkSession().udf(); 
    udfRegister.register("validateNumber", validateNumber, DataTypes.StringType); 

    return dataset.withColumn("rejection_reason", 
        coalesce(
          callUDF("validateNumber", column("cookie"), column("session")))); 
    } 

    public static String validateNumber(Long cookie, Long session) { 
      System.out.println("Into validateNumber function"); 
      if(cookie != 0){ 
      return "correct"; 
      }else{ 
      return "incorrect"; 
      } 
    } 

入力は次のとおりです。

Dataset<Row> input = spark().createDataFrame(Arrays.asList(
       RowFactory.create("28/05/2017 00:12:34", 0L, -2864001245604480000L, "abc" ,"90.202.190.106", 123, "abc", "xyz", "mno"), 
       RowFactory.create("28/05/2017 00:12:34", 2345678L, 2864001245604480000L, "abc" ,"90.202.190.106", 123, "abc", "xyz", "mno")), TEMP_TABLE); 

問題はそれもvalidateNumber()関数内でSYSOUT文を印刷していない、です。

+0

私のためにうまく動作します。データセットの値を確認してください。 – abaghel

+0

@abaghel - validateNumber()に入りましたか? – anukuls

+0

また、あなたが入力していることを教えてください。 – anukuls

答えて

0

以下のサンプルプログラムをご覧ください。

public class SparkUDF { 
    public static void main(String[] args) throws Exception { 
    SparkSession spark = SparkSession 
      .builder() 
      .appName("SparkUDF") 
      .master("local[*]") 
      .getOrCreate(); 
    //data 
    List<Tuple2<Long, Long>> inputList = new ArrayList<Tuple2<Long, Long>>(); 
    inputList.add(new Tuple2<Long, Long>(111l, 10011l)); 
    inputList.add(new Tuple2<Long, Long>(0l, 20022l)); 
    //Dataset 
    Dataset<Row> ds = spark.createDataset(inputList, Encoders.tuple(Encoders.LONG(), Encoders.LONG())).toDF("cookie", "session"); 
    //udf 
    UDF2<Long, Long, String> validateNumber = (UDF2<Long, Long, String>) SparkUDF::validateNumber; 
    spark.udf().register("validateNumber", validateNumber, DataTypes.StringType); 
    Dataset<Row> ds1 = ds.withColumn("rejection_reason",coalesce(callUDF("validateNumber", col("cookie"), col("session")))); 
    ds1.show(); 
    spark.stop(); 
} 

public static String validateNumber(Long cookie, Long session) { 
    if (cookie != 0) { 
     return "correct"; 
    } else { 
     return "incorrect"; 
    } 
    } 
} 

出力は次のようになります。

+------+-------+----------------+ 
|cookie|session|rejection_reason| 
+------+-------+----------------+ 
| 111| 10011|   correct| 
|  0| 20022|  incorrect| 
+------+-------+----------------+ 
関連する問題