2017-06-26 9 views
1

SparkおよびHBaseの新機能です。私はHBaseテーブルのバックアップを取っています。これらのバックアップはS3バケットにあります。HBaseカラムに保存されたAVRO構造体を読み取る

conf.set("io.serializations", "org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.hbase.mapreduce.ResultSerialization") 
val data = sc.newAPIHadoopFile(path,classOf[SequenceFileInputFormat[ImmutableBytesWritable, Result]], classOf[ImmutableBytesWritable], classOf[Result], conf) 

問題のテーブルがEMPSと呼ばれている:私はこのようなnewAPIHadoopFileを用いたスパーク(スカラ)を介して、それらを読んでいます。 Empsのスキーマは次のとおりです。

key: empid {COMPRESSION => 'gz' } 
    family: data 
    dob - date of birth of this employee. 
    e_info - avro structure for storing emp info. 
    e_dept- avro structure for storing info about dept. 

    family: extra - Extra Metadata {NAME => 'extra', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'SNAPPY', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} 
    e_region - emp region 
    e_status - some data about his achievements 
    . 
    . 
    some more meta data 

表には、単純な文字列データを持つ列と、AVRO構造を持つ列があります。

私はこのデータをS3のHBaseバックアップファイルから直接読み込もうとしています。私はテーブルが非常に大きいので、私のローカルマシンでこのHBaseテーブルを再作成したくありません。

これは私がこれを読んしようとしている方法です:期待

data.keys.map{k=>(new String(k.get()))}.take(1) 
res1: Array[String] = Array(111111111100011010102462) 

data.values.map{ v =>{ for(cell <- v.rawCells()) yield{ 
         val family = CellUtil.cloneFamily(cell); 
         val column = CellUtil.cloneQualifier(cell); 
         val value = CellUtil.cloneValue(cell); 
          new String(family) +"->"+ new String(column)+ "->"+ new String(value) 
         } 
         } 
}.take(1) 
res2: Array[Array[String]] = Array(Array(info->dob->01/01/1996, info->e_info->?ж�?�ո� ?�� ???̶�?�ո� ?�� ????, info->e_dept->?ж�??�ո� ?̶�??�ո� �ո� ??, extra->e_region-> CA, extra->e_status->, .....)) 

として、私は正しく、単純な文字列データを見ることができますが、AVROデータがゴミです。

私はGenericDatumReaderを使用してAVRO構造読んでみました:

data.values.map{ v =>{ for(cell <- v.rawCells()) yield{ 
         val family = new String(CellUtil.cloneFamily(cell)); 
         val column = new String(CellUtil.cloneQualifier(cell)); 
         val value = CellUtil.cloneValue(cell); 
         if(column=="e_info"){ 
          var schema_obj = new Schema.Parser 
          //schema_e_info contains the AVRO schema for e_info 
          var schema = schema_obj.parse(schema_e_info) 
          var READER2 = new GenericDatumReader[GenericRecord](schema) 
          var datum= READER2.read(null, DecoderFactory.defaultFactory.createBinaryDecoder(value,null)) 
          var result=datum.get("type").toString() 
           family +"->"+column+ "->"+ new String(result) + "\n" 
          } 
         else 
          family +"->"+column+ "->"+ new String(value)+"\n" 
         } 
       }   

} 

しかし、これは私に次のエラーを与えている:

  1. を:

    org.apache.spark.SparkException: Task not serializable 
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2101) 
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) 
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) 
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
        at org.apache.spark.rdd.RDD.map(RDD.scala:369) 
        ... 74 elided 
    Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema 
    Serialization stack: 
        - object not serializable (class: org.apache.avro.Schema$RecordSchema, value: ..... 
    

    は、だから私は聞きしたいの非直列化可能クラスを作る方法はありますかRecordSchemaはマップ関数で動作しますか?

  2. 私のアプローチは今のポイントですか?私はこの種のデータを扱うより良いアプローチについて知ってうれしいです。
  3. 私はこれをDataframeで処理する方がはるかに簡単だと読んでいます。私はこのように形成されたHadoop RDDをDataframeに変換しようとしましたが、そこでは盲目的に実行しています。

答えて

0

例外として、スキーマは非直列化可能です。マッパー関数内で初期化できますか?ドライバーからエグゼクターに出荷される必要はありません。

また、スキーマを含むスカラシングルトンオブジェクトを作成することもできます。各エグゼキュータで初期化された1つのスカラシングルトンを取得するので、シングルトンから任意のメンバーにアクセスするときに、ネットワーク経由で&がシリアル化される必要はありません。これにより、データの各行ごとにスキーマを再作成するという不必要なオーバーヘッドが回避されます。

データを細かく読み取ることができることを確認する目的で、エグゼキュータ上のバイト配列に変換し、ドライバで収集して、ドライバコードでデシリアライズ(AVROデータの解析)を実行することもできます。しかし、これは明らかに拡張性はありません。データを抽出するためのプロトタイプコードを作成している間は、データが良好に見えるようにし、スパーク関連の合併症を避けるだけです。

関連する問題