2015-09-17 18 views
6

カスタムクラスを列として含むSpark DataFrameを(寄木張りファイルとして)保存します。このクラスは別のカスタムクラスのSeqによって構成されます。これを行うために、私はVectorUDTと同様に、これらのクラスのそれぞれに対してUserDefinedTypeクラスを作成します。私は意図したようにデータフレームを扱うことができますが、寄木細工(またはjason)としてディスクに保存することはできません 私はバグとして報告しましたが、おそらくコードに問題があります。私は、問題を示すために、単純な例を実装しました:ネストされたユーザーデータ型を使用したSpark DataFrameの保存

import org.apache.spark.sql.SaveMode 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.catalyst.InternalRow 
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow 
import org.apache.spark.sql.types._ 

@SQLUserDefinedType(udt = classOf[AUDT]) 
case class A(list:Seq[B]) 

class AUDT extends UserDefinedType[A] { 
    override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) 
    override def userClass: Class[A] = classOf[A] 
    override def serialize(obj: Any): Any = obj match { 
    case A(list) => 
     val row = new GenericMutableRow(1) 
     row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) 
     row 
    } 

    override def deserialize(datum: Any): A = { 
    datum match { 
     case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) 
    } 
    } 
} 

object AUDT extends AUDT 

@SQLUserDefinedType(udt = classOf[BUDT]) 
case class B(num:Int) 

class BUDT extends UserDefinedType[B] { 
    override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) 
    override def userClass: Class[B] = classOf[B] 
    override def serialize(obj: Any): Any = obj match { 
    case B(num) => 
     val row = new GenericMutableRow(1) 
     row.setInt(0, num) 
     row 
    } 

    override def deserialize(datum: Any): B = { 
    datum match { 
     case row: InternalRow => new B(row.getInt(0)) 
    } 
    } 
} 

object BUDT extends BUDT 

object TestNested { 
    def main(args:Array[String]) = { 
    val col = Seq(new A(Seq(new B(1), new B(2))), 
        new A(Seq(new B(3), new B(4)))) 

    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    val df = sc.parallelize(1 to 2 zip col).toDF() 
    df.show() 

    df.write.mode(SaveMode.Overwrite).save(...) 
    } 
} 

これは、次のエラーが発生:

15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } at org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42) at org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:522) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:521) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58) at org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):

Bとのデータフレームの代わりに、何の問題を保存するには何としてB以来、存在しない場合ネストされたカスタムクラス。何か不足していますか?

答えて

2

コードを4つ変更して動作させる必要がありました(LinuxのSpark 1.6.0でテスト済み)。のほとんどはがなぜ必要なのかを説明することができます。しかし、もっと簡単な解決策があるのか​​どうか、私には疑問が感じられます。

  1. sqlTypeを定義し、それはむしろちょうどBUDTよりも、BUDT.sqlTypeに依存します:次のようにすべての変更は、AUDTです。
  2. serialize()には、各リスト要素のBUDT.serialize()を呼び出します。 deserialize()
  3. class AUDT extends UserDefinedType[A] { 
        override def sqlType: DataType = 
        StructType(
         Seq(StructField("list", 
             ArrayType(BUDT.sqlType, containsNull = false), 
             nullable = true))) 
    
        override def userClass: Class[A] = classOf[A] 
    
        override def serialize(obj: Any): Any = 
        obj match { 
         case A(list) => 
         val row = new GenericMutableRow(1) 
         val elements = 
          list.map(_.asInstanceOf[Any]) 
           .map(e => BUDT.serialize(e)) 
           .toArray 
         row.update(0, new GenericArrayData(elements)) 
         row 
        } 
    
        override def deserialize(datum: Any): A = { 
        datum match { 
         case row: InternalRow => 
         val first = row.getArray(0) 
         val bs:Array[InternalRow] = first.toArray(BUDT.sqlType) 
         val bseq = bs.toSeq.map(e => BUDT.deserialize(e)) 
         val a = new A(bseq) 
         a 
        } 
        } 
    
    } 
    

    全4個のCH:すべての要素ここで

  • コールtoArray(BUDT.sqlType)代わりのtoArray(BUDT)
  • コールBUDT.deserialize()は、得られたコードですハンドルの種類がAである場合とBの場合の処理​​の関係は、スキーマの型指定、直列化および逆シリアル化のために非常に明示的になりました。元のコードは、Spark SQLが「それを理解する」という前提に基づいているようですが、これは妥当かもしれませんが、明らかにそうではありません。

+0

これはうまくいきました。私は、DataFrameの列として "複雑な"オブジェクトを使用しています.Spark 1.6.0では、動作が停止しました。これはトリックでしたので、私が学んだ教訓は、シリアライゼーション/デシリアライゼーションに関するすべてを非常に明示的にすることです。乾杯! –

関連する問題