2017-02-23 5 views
1

HBDDから読み込んだRDDをキャッシュして更新する方法としてalluxioメモリファイルシステムにシリアライズする必要がありますインクリメンタルSPARK計算で定期的に使用されます。saveAsObjectでRDDを保存する、例外 "シリアライズできない結果がありました:org.apache.hadoop.hbase.io.ImmutableBytesWritable"

コードは次のようにありますが、題し例外

val inputTableNameEvent = HBaseTables.TABLE_XXX.tableName 
val namedeRDDName = "EventAllCached2Update" 
val alluxioPath = "alluxio://hadoop1:19998/" 
val fileURI = alluxioPath + namedeRDDName 
val path:AlluxioURI = new AlluxioURI("/"+namedeRDDName) 

val fs:FileSystem = FileSystem.Factory.get() 

val conf = HBaseConfiguration.create() 
conf.set(TableInputFormat.INPUT_TABLE, inputTableNameEvent) 

val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
       classOf[org.apache.hadoop.hbase.client.Result]) 
numbers = rdd.count() 
println("rdd count: " + numbers) 
if(fs.exists(path)) 
     fs.delete(path) 
rdd.saveAsObjectFile(fileURI) 

に実行する誰もがこの問題を回避するために別の型にImmutableBytesWritableをマッピングする方法を教えて助けることができますか?また、後で私はobjectFileを使用してこの保存されたオブジェクトを読み込み、後で更新と計算に使用する[(ImmutableBytesWritable、Result)] RDDにする必要があるため、マップを復元可能にする必要があります。

答えて

0

rddを行オブジェクトに変換する必要があります。下記のようなものをhdfsに保存します。解析されたRDDは、データを持つ他のrddと同じです。

val parsedRDD = yourRDD.map(tuple => tuple._2).map(result => (
     Row((result.getRow.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column1".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column2".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column3".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column4".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column5".getBytes()).get(0).getValue.map(_.toChar).mkString), 
     (result.getColumn("CF".getBytes(),"column5".getBytes()).get(0).getValue.map(_.toChar).mkString) 
    ))) 
関連する問題