2017-01-18 20 views
0

SparkSQLを使用してHBaseからデータを取得し、SparkSQLのようにデータの先頭をクエリする必要があります。次のように私がやったHBaseからデータを取得中にRDDでNullデータを取得する

シング:

  1. 作成したスパークconfのオブジェクト
  2. 作成したHBaseのオブジェクト
  3. は、レコードをフェッチするJAVPairRDDを書きました。

私のメインクラスのコードは次のとおりです。

import java.io.Serializable; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.examples.sql.JavaSparkSQL; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.hive.HiveContext; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.apache.spark.api.java.function.Function; 

import scala.Tuple2; 

import com.lsr.LSRTar; 
import com.lsr.utils.*; 
import com.spark.hbase.TestData; 

public class SparkSQLHBaseMain extends SparkJob implements Serializable{ 
    public static final String APPNAME = "Spark-SQL HBase Application"; 

    public static Logger logger = LoggerFactory.getLogger(SparkSQLHBaseMain.class); 

    public static void main(String[] args) { 
     logger.info("Calling method runJob()"); 
     new SparkSQLHBaseMain().runJob(); 
    } 

    public void runJob() { 
     final JavaSparkContext javaSparkContext = getSparkContext(APPNAME); 
     logger.info("Spark Object created !!!"); 
     SQLContext sqlContext = new SQLContext(javaSparkContext); 
     Configuration hbaseConfig = HBaseUtils.getHBaseConf(); 

     hbaseConfig.set(TableInputFormat.INPUT_TABLE, "emp"); 
     hbaseConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "a"); // column family 
     hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "a:id a:name"); 

     JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = 
        javaSparkContext.newAPIHadoopRDD(hbaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); 

     JavaRDD<TestData> rowPairRDD = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, TestData>() { 
        public TestData call(Tuple2<ImmutableBytesWritable, Result> entry) throws Exception { 
         TestData cd = new TestData(); 
         Result r = entry._2; 
         String keyRow = Bytes.toString(r.getRow()); 
         cd.setRowkey(keyRow); 
         cd.setId((String) Bytes.toString(r.getValue(Bytes.toBytes("a"), Bytes.toBytes("id")))); 
         cd.setName((String) Bytes.toString(r.getValue(Bytes.toBytes("a"), Bytes.toBytes("name")))); 
         return cd; 
        } 
       }); 
     System.out.println("Result : \n"+"ID : "+rowPairRDD.id()+"Name : "+rowPairRDD.name()); 

     DataFrame dataFrame = sqlContext.createDataFrame(rowPairRDD, TestData.class); 
     dataFrame.show(); 
     dataFrame.cache(); 
     dataFrame.repartition(100); 
     dataFrame.printSchema(); 
    } 

} 
次のように私のBeanクラスのコードがある

package com.spark.hbase; 

import java.io.Serializable; 

public class TestData extends java.lang.Object implements Serializable{ 
    /** 
    * 
    */ 
    private static final long serialVersionUID = 1L; 
    String keyRow; 
    String id; 
    String name; 
    public void setRowkey(String keyRow) { 
     this.keyRow = keyRow; 
    } 

    public void setId(String id) { 
     this.id = id; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

} 

が例外を下回る行き方:

17/01/18 12:28:54 INFO HBaseUtils: HBase is running !!! 
HBase is running! 
17/01/18 12:28:56 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 214.5 KB, free 214.5 KB) 
17/01/18 12:28:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.0 KB, free 239.5 KB) 
17/01/18 12:28:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43929 (size: 25.0 KB, free: 257.8 MB) 
17/01/18 12:28:57 INFO SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkSQLHBaseMain.java:49 
Result : 
ID : 1Name : null 
Exception in thread "main" java.lang.NullPointerException 
    at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465) 
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:110) 
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:109) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:109) 
    at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:54) 
    at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:941) 
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:572) 
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:591) 
    at com.ril.jio.spark.sql.SparkSQLHBaseMain.runJob(SparkSQLHBaseMain.java:63) 
    at com.ril.jio.spark.sql.SparkSQLHBaseMain.main(SparkSQLHBaseMain.java:35) 

マイスパーク&正しく動作するHBase。

この問題を解決するのを手伝ってください。

+0

の可能性の重複を読むためにRDDに反復するために使用foreach()rowPairRDD

JavaRDD<TestData> rowPairRDD = .... 

後(未テスト)

[NullPointerExceptionとは何ですか?どうすれば修正できますか?](http://stackoverflow.com/questions/218384/what-is-a-nullpointerexception-and-how-do-i-fix-it) –

答えて

0

私はあなたのコードは以下のようなものであるべきだと思う:記録

rowPairRDD.foreach(new VoidFunction<TestData>() { 
    public void call(TestData entry) { 
     System.out.println("Result : ID : " + entry.id() + " Name : " + entry.name()); 
    } 
} 
関連する問題