3

私はセグメント列上の任意の地図操作を適用して、基礎となるLineStrign方法を使用することができたLineStringSpark 2.0.0:DataSetをカスタムエンコードされた型に集約する方法は?

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) 
implicit def tuple2[A1, A2](implicit 
          e1: Encoder[A1], 
          e2: Encoder[A2] 
          ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) 
implicit val lineStringEncoder = Encoders.kryo[LineString] 

val ds = segmentPoints.map(
    sp => { 
    val p1 = new Coordinate(sp.lon_ini, sp.lat_ini) 
    val p2 = new Coordinate(sp.lon_fin, sp.lat_fin) 
    val coords = Array(p1, p2) 

    (sp.id, gf.createLineString(coords)) 
    }) 
    .toDF("id", "segment") 
    .as[(Long, LineString)] 
    .cache 

ds.show 

    +----+--------------------+ 
    | id |  segment  | 
    +----+--------------------+ 
    | 347|[01 00 63 6F 6D 2...| 
    | 347|[01 00 63 6F 6D 2...| 
    | 347|[01 00 63 6F 6D 2...| 
    | 808|[01 00 63 6F 6D 2...| 
    | 808|[01 00 63 6F 6D 2...| 
    | 808|[01 00 63 6F 6D 2...| 
    +----+--------------------+ 

ためkryoエンコーダとタプルエンコーダを使用して、[(ロング、ラインストリング)]データセットとして格納されているいくつかのデータを持っています。

val length = new Aggregator[LineString, Double, Double] with Serializable { 
    def zero: Double = 0      // The initial value. 
    def reduce(b: Double, a: LineString) = b + a.getLength // Add an element to the running total 
    def merge(b1: Double, b2: Double) = b1 + b2 // Merge intermediate values. 
    def finish(b: Double) = b 
    // Following lines are missing on the API doc example but necessary to get 
    // the code compile 
    override def bufferEncoder: Encoder[Double] = Encoders.scalaDouble 
    override def outputEncoder: Encoder[Double] = Encoders.scalaDouble 
}.toColumn 

ds.groupBy("id) 
    .agg(length(col("segment")).as("kms")) 
    .show(false) 
:アグリゲータを使用して)

1:

ds.map(_._2.getClass.getName).show(false) 

+--------------------------------------+ 
|value         | 
+--------------------------------------+ 
|com.vividsolutions.jts.geom.LineString| 
|com.vividsolutions.jts.geom.LineString| 
|com.vividsolutions.jts.geom.LineString| 

Iは、同じIDを持つセグメントを処理するために、いくつかのUDAFsを作成したいと思い、私は成功せずfolling 2つの異なるアプローチを試みました

ここで次のエラーが発生します。

Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [id#603L], [id#603L, anon$1([email protected], None, input[0, double, true] AS value#715, cast(value#715 as double), input[0, double, true] AS value#714, DoubleType, DoubleType)['segment] AS kms#721]; 

2)私が間違って何をやっているUserDefinedAggregateFunction

class Length extends UserDefinedAggregateFunction { 
    val e = Encoders.kryo[LineString] 

    // This is the input fields for your aggregate function. 
    override def inputSchema: StructType = StructType(
    StructField("segment", DataTypes.BinaryType) :: Nil 
) 

    // This is the internal fields you keep for computing your aggregate. 
    override def bufferSchema: StructType = StructType(
     StructField("length", DoubleType) :: Nil 
) 

    // This is the output type of your aggregatation function. 
    override def dataType: DataType = DoubleType 

    override def deterministic: Boolean = true 

    // This is the initial value for your buffer schema. 
    override def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = 0.0 
    } 

    // This is how to update your buffer schema given an input. 
    override def update(buffer : MutableAggregationBuffer, input : Row) : Unit = { 
    // val l0 = input.getAs[LineString](0) // Can't cast to LineString (I guess because it is searialized using given encoder) 
    val b = input.getAs[Array[Byte]](0) // This works fine 
    val lse = e.asInstanceOf[ExpressionEncoder[LineString]] 
    val ls = lse.fromRow(???) // it expects InternalRow but input is a Row instance 
    // I also tried casting b.asInstance[InternalRow] without success. 
    buffer(0) = buffer.getAs[Double](0) + ls.getLength 
    } 

    // This is how to merge two objects with the bufferSchema type. 
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
    buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0) 
    } 

    // This is where you output the final value, given the final value of your bufferSchema. 
    override def evaluate(buffer: Row): Any = { 
    buffer.getDouble(0) 
    } 
} 

val length = new Length 
rseg 
    .groupBy("id") 
    .agg(length(col("segment")).as("kms")) 
    .show(false) 

を使用していますか?私は、rdd groupBy APIを使用する代わりに、カスタムタイプの集約APIを使用したいと思います。私はSparkの文書を検索しましたが、この問題に対する答えを見つけることができませんでした。現時点では早い段階にあるようです。

ありがとうございました。

答えて

0

このanswerによれば、ネストされたタイプのカスタムエンコーダ、つまりlike(Long、LineString)を簡単な方法で渡すことはできません。

1つのオプションは、id: Long属性でLineStringを延長するcase class LineStringWithIDを定義すること、およびSQLImplicits

P.S.からエンコーダを使用することができますあなたはあなたの質問を小さなトピック、それぞれ1つのトピックに分解できますか?

関連する問題