2017-12-16 9 views
1

私はKafkaからデータ(json文字列)を読み込み、それを解析して与えられたスキーマを持つデータセットに変換し、そのデータセットの属性に対する集計を実行します。私はこのデータセットをjson文字列に変換してからhbaseに格納したいと思っています。これを行う最善の方法は何ですか? サンプルデータセット:HBaseの中に私のキーに対するhbaseの行のデータセットを保存する

id|name|age 

1 |geet|21 

予想される出力:

{"id":"1","name":"geet","age":"21"} 
+0

..

、ご希望の形式でのHBaseにしてJSONを格納するforeachPartitionと、以下のexample of SparkOnHbaseのコードを実行します(あなたのJSON以外の)一般的な例を与えているのはなぜあなたをしていますHbaseにjsonを格納していますか? –

答えて

0

あなたが戻って別のカフカトピックに得られたデータを書き込み、そのデータを書き込むことがhttps://github.com/landoop/stream-reactorでカサンドラカフカコネクターが使用可能に使用することができますキャッサンドラへ

2

スパークはHBASEのシンクを提供しません。あなたはHortonworksによって提供されたspark-hbaseコネクタを試すことができます。以下のようなRDD/DF/DS、

hbaseout.forEachPartition { record => 
record.ForEach { 
//hbase write code goes here 
} 
} 
1

オーバーエルスあなたができたループがこれを行うための最善の方法は何ですか?

Hbase connectorは、Hortonworksディストリビューションで使用している場合にのみ利用可能です。

私は


package org.apache.hadoop.hbase.spark.example.rdd 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} 
import org.apache.hadoop.hbase.spark.HBaseContext 
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.spark.{SparkContext, SparkConf} 
/** 
* This is a simple example of using the foreachPartition 
* method with a HBase connection 
*/ 
object HBaseForeachPartitionExample { 
def main(args: Array[String]) { 
if (args.length < 2) { 
println("HBaseBulkPutExample {tableName} {columnFamily}") 
return 
} 
val tableName = args(0) 
val columnFamily = args(1) 
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + 
tableName + " " + columnFamily) 
val sc = new SparkContext(sparkConf) 
try { 
//[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] 
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"), 
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), 
(Bytes.toBytes("2"), 
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), 
(Bytes.toBytes("3"), 
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), 
(Bytes.toBytes("4"), 
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), 
(Bytes.toBytes("5"), 
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) 
)) 
val conf = HBaseConfiguration.create() 
val hbaseContext = new HBaseContext(sc, conf) 
rdd.hbaseForeachPartition(hbaseContext, 
(it, connection) => { 
val m = connection.getBufferedMutator(TableName.valueOf(tableName)) 
it.foreach(r => { 
val put = new Put(r._1) 
r._2.foreach((putValue) => 
put.addColumn(putValue._1, putValue._2, putValue._3)) 
m.mutate(put) 
}) 
m.flush() 
m.close() 
}) 
} finally { 
sc.stop() 
} 
} 
} 
+0

あなたはどんな答えでも大丈夫なら[s]答えを受け入れてください..それは役に立ちましたか? – user3190018

関連する問題