2016-09-05 4 views
0

私は大きなhbaseデータセットを持っており、デバッグするサンプルセットを得るために特定の条件でデータを取得したいと思います。spark filter hbaseサンプルを取得する

val conf = HBaseConfiguration.create() 
conf.set("hbase.zookeeper.quorum", "172.16.1.10,172.16.1.11,172.16.1.12") 
conf.setInt("timeout", 120000) 
conf.set(TableInputFormat.INPUT_TABLE, "dateset") 
val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) 
val filteredRDD = rdd.filter{ 
    case tuple => { 
     val result = tuple._2 
     val hostId = new String(result.getValue("user", "id")) 
     hostId == "12345" // <-- only retrieve the row when user:id is 12345 
    } 
} 

は、今私はfilteredRDDのRDDを取得し、私は別のテーブル

val conf = HBaseConfiguration.create() 
conf.set("hbase.zookeeper.quorum", "172.16.1.10,172.16.1.11,172.16.1.12") 
conf.setInt("timeout", 120000) 
conf.set(TableOutputFormat.OUTPUT_TABLE, "data_sample") 
// here I don't know which api to use 

でき誰かに戻って同じ形式としてそれを保存したい:ここ

は、Spark RDDとフィルタであり、私に手がかりを与える?ありがとう。

+0

を私はScalaの専門家ではないよと、以下のJavaで同じことを行う方法を答えています。うまくいけばScalaにとってもこれが存在することを願っています。 –

+0

あなたのコードは 'filteredRDD'で確実に動作しますか?それは私のために働いていません。 'getValue'メソッドは文字列の代わりに' Array [Byte] 'を必要とします。 – Explorer

答えて

0

あなたは次のようにRDDペアにsaveAsNewAPIHadoopDataset機能を使用することができます。

JavaPairRDD<ImmutableBytesWritable, Put> pairs = filterRDD.mapToPair(new PairFunction<Object, ImmutableBytesWritable, Put>() { 
    @Override 
    public Tuple2<ImmutableBytesWritable, Put> execute(Object message) throws Exception { 

     //prepare the HBase put here 
     return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); 
    } 
}); 

Job newAPIJobConfiguration = Job.getInstance(hbaseConf); 
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName); 
newAPIJobConfiguration.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); 

pairs.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration()); 
関連する問題