2017-04-18 7 views
0

私はhbaseテーブルを作成し、spark core(spark streaming after)を使用して挿入しようとしています。 私は、私はこの問題なった場合でも、テーブルを作成し、そこにデータを追加することに成功した:spark 1.3 hbaseエラーで遊ぶ

warning: Class org.apache.hadoop.hbase.classification.InterfaceAudience not found - continuing with a stub. 

をしかし、私はエラーを得たカウントしようとすると、誰かが最初の警告で私を助けて、私は、このテーブルにストリーミングデータを追加傾ける方法

私のコードは怒鳴るある可能性があります

  import org.apache.spark._ 
      import org.apache.spark.rdd.NewHadoopRDD 
      import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} 
      import org.apache.hadoop.hbase.client.HBaseAdmin 
      import org.apache.hadoop.hbase.mapreduce.TableInputFormat 
      import org.apache.hadoop.fs.Path; 
      import org.apache.hadoop.hbase.HColumnDescriptor 
      import org.apache.hadoop.hbase.util.Bytes 
      import org.apache.hadoop.hbase.client.Put; 
      import org.apache.hadoop.hbase.client.HTable; 
      import org.apache.hadoop.hbase.mapred.TableOutputFormat 
      import org.apache.hadoop.mapred.JobConf 
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
      import org.apache.hadoop.mapreduce.Job 
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat 
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat 
      import org.apache.hadoop.hbase.KeyValue 
      import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat 
      import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 
     val tableName = "ziedspark" 
     val conf = HBaseConfiguration.create() 
     conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/etc/hbase/conf.dist/hbase-site.xml")) 
    conf.set(TableInputFormat.INPUT_TABLE, tableName) 
    val admin = new HBaseAdmin(conf) 
     if(!admin.isTableAvailable(tableName)) { 
      print("Creating GHbase Table Creating GHbase Table Creating GHbase Table Creating GHbase Table ") 
      val tableDesc = new HTableDescriptor(tableName) 
      tableDesc.addFamily(new HColumnDescriptor("z1".getBytes())) 
      tableDesc.addFamily(new HColumnDescriptor("z2".getBytes())) 
      admin.createTable(tableDesc) 

     }else{ 
      print("Table already exists!!") 
     } 
val myTable = new HTable(conf, tableName) 
    for (i <- 414540 to 414545) { 

     var p = new Put(Bytes.toBytes(""+i)) 
     p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5))) 
     p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2016-07-01")) 
     p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i)) 
     p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i)) 
     myTable.put(p) 
    } 

    myTable.flushCommits() 


     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
classOf[org.apache.hadoop.hbase.client.Result]) 

      //error here after creating the table count is not working 
     val count = hBaseRDD.count() 
    print("HBase RDD count:" + count) 
    System.exit(0) 

答えて

0

スパークから読書に関連する同様の質問を見つけてください。

How to read from hbase using spark

も言及したライブラリで、あなたが読んで、HBaseの中に書き込むためのスタブを取得します。

これ以上のヘルプがある場合はお知らせください。

+0

私はこのインポートに問題があります: 'import it.nerdammer.spark.hbase._'私はすべてのjarファイルを追加します。私はスパークシェルを使用しています –

+0

あなたはspark-shellコマンドも共有できますか? –

+0

'spark-shell --jars spark-streaming-kafka_2.10-1.3.0.jar kafka_2.10-0.8.0.jar spark-hbase-connector_2.10-1.0.3 \(2 \)。jar' –

関連する問題