2016-08-02 7 views
1

SpaseストリームコードでHBaseデータを読み込み、ストリーミングデータをさらに強化したいと考えています。私はspark-hbase-connector_2.10-1.0.3.jarを使用しています。私のコードでScalaのHBaseを読む - it.nerdammer

次の行には、右の数を返しますdocRdd.count成功

val docRdd = 
    sc.hbaseTable[(Option[String], Option[String])]("hbase_customer_profile") 
    .select("id","gender").inColumnFamily("data") 

です。

docRddはタイプ

のHBaseReaderBuilder(org.apache.spark.SparkContext @ 3a49e5、hbase_customer_profile、いくつかの(データ)、WrappedArray(ID、 性別)、なし、なし、リスト())

です

id, gender列のすべての行を読み込むにはどうすればよいですか。また、docRddをデータフレームに変換して、SparkSQLを使用することもできます。私は、行キーを追加した

case class Customer(rowKey: String, id: Option[String], gender: Option[String]) 

答えて

1

あなたはDataFrameRDDを変換するには

docRdd.collect().foreach(println) 

を使用してRDDからすべての行を読み取ることができ、あなたはケースクラスを定義することができますケースクラスに;これは厳密には必要ではないので、必要がなければ省略することができます。

: - - ケースクラスに基づいて spark-shellからの出力は次のようになります DataFrame

import sqlContext.implicits._ 
val df = rdd.toDF() 
df.show() 
df.printSchema() 

RDDを変換

// Row key, id, gender 
type Record = (String, Option[String], Option[String]) 

val rdd = 
    sc.hbaseTable[Record]("customers") 
    .select("id","gender") 
    .inColumnFamily("data") 
    .map(r => Customer(r._1, r._2, r._3)) 

、その後:RDD以上

その後map

scala> df.show() 
+---------+----+------+ 
| rowKey| id|gender| 
+---------+----+------+ 
|customer1| 1| null| 
|customer2|null|  f| 
|customer3| 3|  m| 
+---------+----+------+ 

scala> df.printSchema() 
root 
|-- rowKey: string (nullable = true) 
|-- id: string (nullable = true) 
|-- gender: string (nullable = true) 
+0

ありがとう@ベリリウム。私はこれを試してみる。私はSparkStreamでRDDを使いたいです。私はそれが同様にシリアライズ可能であることを望む。おかげさまでもう一度お返事ありがとうございます –

+0

この質問にはもう助けが必要ですか? – Beryllium

+0

私はこのすべてに設定されています。ありがとうございました.. –

関連する問題