2016-08-22 2 views
3

python-apiでQualiferFilterのようなフィルタを使用してHBaseから行を取得したいとします。
私はHBaseからコードのような行を取得する方法を知っています。Spark:python-apiでQualaseFilterなどのHBaseフィルタを使用する方法

host = 'localhost' 
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" 
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" 
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "user", 
       "hbase.mapreduce.scan.columns": "u:uid", 
       "hbase.mapreduce.scan.row.start": "1", "hbase.mapreduce.scan.row.stop": "100"} 
rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", 
          "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
         "org.apache.hadoop.hbase.client.Result", 
         keyConverter=keyConv, valueConverter=valueConv, conf=conf) 

しかし、フィルタを使用して行を取得したいと考えています。
追加する必要があるコードのタイプは何ですか?

+0

あなたがこれを行う方法を見つけ出すのですか? – void

+0

解決策が見つかりませんでした。結局、私はScala APIを使用します。私は、Python APIは本番環境ではまだ使用できないと思います。 – penlight

答えて

0

こんにちはあなたはこのコードを確認することができます................

def doYourStuff(row): 
    text = row.split("\n") 
    data = {} 
    for row in text: 
     if json.loads(row)["qualifier"] == "message": 
       data["message"] = json.loads(row)["value"] 
     if json.loads(row)["qualifier"] == "domain": 
       data["domain"] = json.loads(row)["value"] 
     data["rowKey"] = json.loads(row)["row"] 
     return DoWhatYouWantToDo(data) 

    def save_record(rdd): 
     host = '[email protected]@[email protected]@' 
     table = 'TableName' 
     keyConv1 = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" 
     valueConv1 = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" 
     conf = {"hbase.zookeeper.quorum": host, 
       "hbase.mapred.outputtable": table, 
       "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", 
       "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
       "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} 
     rdd.saveAsNewAPIHadoopDataset(
      keyConverter=keyConv1, valueConverter=valueConv1,conf=conf) 


    hbaseRdd = hbaseRdd.map(lambda x: x[1]) # message_rdd = hbase_rdd.map(lambda x:x[0]) will give only row-key 

    processedRdd = hbaseRdd.map(lambda x: doYourStuff(x)) 
    save_record(processedRdd) 
関連する問題