2016-06-22 11 views
2

私はCDH 5.5.2ディストリビューションからSpark 1.5.0を使用しています。 2.10.4からScala 2.10.5に切り替えました。私はUDAFのために次のコードを使用しています。これは何とかString対UTF8Stringの問題ですか?はいの場合は、どんな助けでも大歓迎です。スパークUDAF:java.lang.InternalError:不正なクラス名

object GroupConcat extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("x", StringType) 
    def bufferSchema = new StructType().add("buff", ArrayType(StringType)) 
    def dataType = StringType 
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
     buffer.update(0, ArrayBuffer.empty[String]) 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
     if (!input.isNullAt(0)) 
     buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0)) 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
     buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)) 
    } 

    def evaluate(buffer: Row) = UTF8String.fromString(
     buffer.getSeq[String](0).mkString(",")) 
} 

はしかし、私は、実行時にこのエラーメッセージが表示されます。

Exception in thread "main" java.lang.InternalError: Malformed class name 
at java.lang.Class.getSimpleName(Class.java:1190) 
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.toString(udaf.scala:464) 
at java.lang.String.valueOf(String.java:2847) 
at java.lang.StringBuilder.append(StringBuilder.java:128) 
at scala.StringContext.standardInterpolator(StringContext.scala:122) 
at scala.StringContext.s(StringContext.scala:90) 
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2.toString(interfaces.scala:96) 
at org.apache.spark.sql.catalyst.expressions.Expression.prettyString(Expression.scala:174) 
at org.apache.spark.sql.GroupedData$$anonfun$1.apply(GroupedData.scala:86) 
at org.apache.spark.sql.GroupedData$$anonfun$1.apply(GroupedData.scala:80) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
at org.apache.spark.sql.GroupedData.toDF(GroupedData.scala:80) 
at org.apache.spark.sql.GroupedData.agg(GroupedData.scala:227) 

答えて

0

UserDefinedAggregateFunctionを拡張して、私のオブジェクトが別の関数の中にあったので、私は同じ例外を受け取りました。

変更この:これに

object Driver { 
    def main(args: Array[String]) { 

     object GroupConcat extends UserDefinedAggregateFunction { 
      ... 
     } 
    } 
} 

object Driver { 
    def main(args: Array[String]) { 
     ... 
    } 

    object GroupConcat extends UserDefinedAggregateFunction { 
     ... 
    } 
} 
0

私が輸入されたパッケージとの競合としてこれに走りました。何かをインポートしている場合は、何もインポートされていないスパークシェルでテストしてみてください。

あなたのUDAFを定義するときに、返される名前がどのようなものかを確認します。

FractionOfDayCoverage: org.apache.spark.sql.expressions.UserDefinedAggregateFunction{def dataType: org.apache.spark.sql.types.DoubleType.type; def evaluate(buffer: org.apache.spark.sql.Row): Double} = [email protected] 

最後に$ anon $ 1 @ 27506b6dはうまくいくでしょう。競合するパッケージをインポートしたとき、返される名前は3倍長くなりました。ここに例があります:

[email protected] 
関連する問題