2017-01-20 6 views
2

にデータを入れて、私はこの分野では初心者ですので、私はそれの感覚を得ることができない...号のHBase

  • のHBase版:0.98.24-hadoop2
  • スパークバージョン:2.1.0

次のコードは、Spark Streming-Kafkaプロデューサーからの受信データをHBaseに入れようとしています。 TAG2,134

スパークストリーミングプロセス「はデリミタによって受信ラインを分割し、

ライン1 TAG1,123
行1:

  • カフカ入力データフォーマットは次のようですHBaseにデータを入れてください。 しかし、htable.put()メソッドを呼び出すと、アプリケーションでエラーが発生しました。 次のコードがエラーを投げている理由を助けることができますか?

    ありがとうございます。

    JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() { 
        private static final long serialVersionUID = 7113426295831342436L; 
    
        HTable htable; 
        public HTable set() throws IOException{ 
         Configuration hconfig = HBaseConfiguration.create(); 
         hconfig.set("hbase.zookeeper.property.clientPort", "2222"); 
         hconfig.set("hbase.zookeeper.quorum", "127.0.0.1"); 
    
         HConnection hconn = HConnectionManager.createConnection(hconfig); 
    
         htable = new HTable(hconfig, tableName); 
    
         return htable; 
        }; 
        @Override 
        public Iterator<String> call(String x) throws IOException { 
    
         ////////////// Put into HBase ///////////////////// 
         String[] data = x.split(","); 
    
         if (null != data && data.length > 2){ 
          SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); 
          String ts = sdf.format(new Date()); 
    
          Put put = new Put(Bytes.toBytes(ts)); 
    
          put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0])); 
          put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1])); 
          put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2])); 
    
    /*I've checked data passed like this 
    {"totalColumns":3,"row":"20170120200927", 
    "families":{"TAGVALUE": 
    [{"qualifier":"LINEID","vlen":3,"tag[], "timestamp":9223372036854775807}, 
    {"qualifier":"TAGID","vlen":3,"tag":[],"timestamp":9223372036854775807}, 
    {"qualifier":"VAL","vlen":6,"tag" [],"timestamp":9223372036854775807}]}}*/ 
    
    
    //********************* ERROR *******************// 
          htable.put(put); 
          htable.close(); 
    
    
         } 
    
         return Arrays.asList(COLDELIM.split(x)).iterator(); 
        } 
    }); 
    

    ERROコード:

    Exception in thread "main" org.apache.spark.SparkException: Job 
    
    aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException 
    at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:154) 
    at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:123) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
    

答えて

2

あなたはhtableインスタンスを返すpublic HTable set() throws IOException このメソッドを呼び出していません。 htableインスタンス以来

がnullで、あなたはあなたの親切な助けのため

stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException 
+0

おかげで以下のようにNPEを取得している、ヌル

htable.put() 

上で操作をしようとしています。私は仕事を続けることができないという問題を解決しました... –