私はセグメント列上の任意の地図操作を適用して、基礎となるLineStrign方法を使用することができたLineStringSpark 2.0.0:DataSetをカスタムエンコードされた型に集約する方法は?

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) 
implicit def tuple2[A1, A2](implicit 
          e1: Encoder[A1], 
          e2: Encoder[A2] 
          ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) 
implicit val lineStringEncoder = Encoders.kryo[LineString] 

val ds = segmentPoints.map(
    sp => { 
    val p1 = new Coordinate(sp.lon_ini, sp.lat_ini) 
    val p2 = new Coordinate(sp.lon_fin, sp.lat_fin) 
    val coords = Array(p1, p2) 

    (sp.id, gf.createLineString(coords)) 
    .toDF("id", "segment") 
    .as[(Long, LineString)] 


    | id |  segment  | 
    | 347|[01 00 63 6F 6D 2...| 
    | 347|[01 00 63 6F 6D 2...| 
    | 347|[01 00 63 6F 6D 2...| 
    | 808|[01 00 63 6F 6D 2...| 
    | 808|[01 00 63 6F 6D 2...| 
    | 808|[01 00 63 6F 6D 2...| 


val length = new Aggregator[LineString, Double, Double] with Serializable { 
    def zero: Double = 0      // The initial value. 
    def reduce(b: Double, a: LineString) = b + a.getLength // Add an element to the running total 
    def merge(b1: Double, b2: Double) = b1 + b2 // Merge intermediate values. 
    def finish(b: Double) = b 
    // Following lines are missing on the API doc example but necessary to get 
    // the code compile 
    override def bufferEncoder: Encoder[Double] = Encoders.scalaDouble 
    override def outputEncoder: Encoder[Double] = Encoders.scalaDouble 




|value         | 

Iは、同じIDを持つセグメントを処理するために、いくつかのUDAFsを作成したいと思い、私は成功せずfolling 2つの異なるアプローチを試みました


Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [id#603L], [id#603L, anon$1([email protected], None, input[0, double, true] AS value#715, cast(value#715 as double), input[0, double, true] AS value#714, DoubleType, DoubleType)['segment] AS kms#721]; 


class Length extends UserDefinedAggregateFunction { 
    val e = Encoders.kryo[LineString] 

    // This is the input fields for your aggregate function. 
    override def inputSchema: StructType = StructType(
    StructField("segment", DataTypes.BinaryType) :: Nil 

    // This is the internal fields you keep for computing your aggregate. 
    override def bufferSchema: StructType = StructType(
     StructField("length", DoubleType) :: Nil 

    // This is the output type of your aggregatation function. 
    override def dataType: DataType = DoubleType 

    override def deterministic: Boolean = true 

    // This is the initial value for your buffer schema. 
    override def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = 0.0 

    // This is how to update your buffer schema given an input. 
    override def update(buffer : MutableAggregationBuffer, input : Row) : Unit = { 
    // val l0 = input.getAs[LineString](0) // Can't cast to LineString (I guess because it is searialized using given encoder) 
    val b = input.getAs[Array[Byte]](0) // This works fine 
    val lse = e.asInstanceOf[ExpressionEncoder[LineString]] 
    val ls = lse.fromRow(???) // it expects InternalRow but input is a Row instance 
    // I also tried casting b.asInstance[InternalRow] without success. 
    buffer(0) = buffer.getAs[Double](0) + ls.getLength 

    // This is how to merge two objects with the bufferSchema type. 
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
    buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0) 

    // This is where you output the final value, given the final value of your bufferSchema. 
    override def evaluate(buffer: Row): Any = { 

val length = new Length 

を使用していますか?私は、rdd groupBy APIを使用する代わりに、カスタムタイプの集約APIを使用したいと思います。私はSparkの文書を検索しましたが、この問題に対する答えを見つけることができませんでした。現時点では早い段階にあるようです。





1つのオプションは、id: Long属性でLineStringを延長するcase class LineStringWithIDを定義すること、およびSQLImplicits

