2017-07-05 22 views
0

私はJava Sparkのドキュメントを理解しようとしています。 型定義されていないユーザー定義集計関数には、理解できないサンプルコードがいくつかあります。ユーザー定義集計関数の作成方法は?

  • 私はUDFを作成したいときはいつでも、私は機能initializeupdatemergeている必要があります

    package org.apache.spark.examples.sql; 
    
    // $example on:untyped_custom_aggregation$ 
    import java.util.ArrayList; 
    import java.util.List; 
    
    import org.apache.spark.sql.Dataset; 
    import org.apache.spark.sql.Row; 
    import org.apache.spark.sql.SparkSession; 
    import org.apache.spark.sql.expressions.MutableAggregationBuffer; 
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; 
    import org.apache.spark.sql.types.DataType; 
    import org.apache.spark.sql.types.DataTypes; 
    import org.apache.spark.sql.types.StructField; 
    import org.apache.spark.sql.types.StructType; 
    // $example off:untyped_custom_aggregation$ 
    
    public class JavaUserDefinedUntypedAggregation { 
    
        // $example on:untyped_custom_aggregation$ 
        public static class MyAverage extends UserDefinedAggregateFunction { 
    
        private StructType inputSchema; 
        private StructType bufferSchema; 
    
        public MyAverage() { 
         List<StructField> inputFields = new ArrayList<>(); 
         inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true)); 
         inputSchema = DataTypes.createStructType(inputFields); 
    
         List<StructField> bufferFields = new ArrayList<>(); 
         bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true)); 
         bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true)); 
         bufferSchema = DataTypes.createStructType(bufferFields); 
        } 
        // Data types of input arguments of this aggregate function 
        public StructType inputSchema() { 
         return inputSchema; 
        } 
        // Data types of values in the aggregation buffer 
        public StructType bufferSchema() { 
         return bufferSchema; 
        } 
        // The data type of the returned value 
        public DataType dataType() { 
         return DataTypes.DoubleType; 
        } 
        // Whether this function always returns the same output on the identical input 
        public boolean deterministic() { 
         return true; 
        } 
        // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to 
        // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides 
        // the opportunity to update its values. Note that arrays and maps inside the buffer are still 
        // immutable. 
        public void initialize(MutableAggregationBuffer buffer) { 
         buffer.update(0, 0L); 
         buffer.update(1, 0L); 
        } 
        // Updates the given aggregation buffer `buffer` with new input data from `input` 
        public void update(MutableAggregationBuffer buffer, Row input) { 
         if (!input.isNullAt(0)) { 
         long updatedSum = buffer.getLong(0) + input.getLong(0); 
         long updatedCount = buffer.getLong(1) + 1; 
         buffer.update(0, updatedSum); 
         buffer.update(1, updatedCount); 
         } 
        } 
        // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` 
        public void merge(MutableAggregationBuffer buffer1, Row buffer2) { 
         long mergedSum = buffer1.getLong(0) + buffer2.getLong(0); 
         long mergedCount = buffer1.getLong(1) + buffer2.getLong(1); 
         buffer1.update(0, mergedSum); 
         buffer1.update(1, mergedCount); 
        } 
        // Calculates the final result 
        public Double evaluate(Row buffer) { 
         return ((double) buffer.getLong(0))/buffer.getLong(1); 
        } 
        } 
        // $example off:untyped_custom_aggregation$ 
    
        public static void main(String[] args) { 
        SparkSession spark = SparkSession 
         .builder() 
         .appName("Java Spark SQL user-defined DataFrames aggregation example") 
         .getOrCreate(); 
    
        // $example on:untyped_custom_aggregation$ 
        // Register the function to access it 
        spark.udf().register("myAverage", new MyAverage()); 
    
        Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json"); 
        df.createOrReplaceTempView("employees"); 
        df.show(); 
        // +-------+------+ 
        // | name|salary| 
        // +-------+------+ 
        // |Michael| 3000| 
        // | Andy| 4500| 
        // | Justin| 3500| 
        // | Berta| 4000| 
        // +-------+------+ 
    
        Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees"); 
        result.show(); 
        // +--------------+ 
        // |average_salary| 
        // +--------------+ 
        // |  3750.0| 
        // +--------------+ 
        // $example off:untyped_custom_aggregation$ 
    
        spark.stop(); 
        } 
    } 
    

    私の疑問上記のコードに関連しています。ここでは、コードのですか?

  • 変数inputSchemabufferSchemaの重要性は何ですか?私は彼らがDataFramesをまったく作成するために使われることはないので、彼らが存在するのに驚いています。彼らはすべてのUDFに存在するはずですか?はいの場合、それらはまったく同じ名前であるはずですか?
  • inputSchemabufferSchemaのゲッターは、getInputSchema()getBufferSchema()という名前が付けられていないのはなぜですか?なぜこれらの変数の設定者がいないのですか?
  • deterministic()という関数の意味は何ですか?この関数を呼び出すと便利なシナリオを教えてください。

一般的に私はSparkでユーザー定義集計関数を書く方法を知りたいと思っています。

私はUDFを作成したいときはいつでも、私は機能を初期化する必要があります

答えて

1

、更新および

UDF方法initializeしばらくユーザー定義関数用スタンド、update、およびmergeをマージユーザー定義集約関数(別名UDAF)です。

UDFは、1行で(通常は)1行を生成する関数です(例:upper関数)。

UDAFは、0行または複数行で1行を生成する関数です(例:count集合関数)。

ユーザー定義関数(UDF)の場合、関数initializeupdateおよびmergeを必ずしも持っている必要はありません。

udffunctionsのいずれかを使用して、UDFを定義して登録します。スパークでのユーザ定義の集約関数の書き方に

val myUpper = udf { (s: String) => s.toUpperCase } 

方法。

変数inputSchemabufferSchemaの意義は何ですか?

Untyped User-Defined Aggregate Functionsを引用(恥知らずなプラグUserDefinedAggregateFunction — Contract for User-Defined Aggregate Functions (UDAFs)で私はマスタリングApacheのスパークでそれらを説明してきた2 gitbook):つまり

// Data types of input arguments of this aggregate function 
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) 

// Data types of values in the aggregation buffer 
def bufferSchema: StructType = { 
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) 
} 

inputSchemaあなたが入力から何を期待しているのですか?bufferSchemaはあなたが一時的にあなたが何をしているのかg凝集。

なぜこれらの変数の設定者はありませんか?

これらはSparkによって管理される拡張ポイントです。

ここでdeterministic()と呼ばれる機能の意義は何ですか?

Untyped User-Defined Aggregate Functionsを引用:

// Whether this function always returns the same output on the identical input 
def deterministic: Boolean = true 

この関数を呼び出すことは有用であろうシナリオをお願いします。

これは私がまだ取り組んでいるものなので、今日は答えられません。

関連する問題