私はKafkaからデータを受け取り、Hbaseテーブルに書き込むプロジェクトを作成しています。レコードの差分を知りたいので、最初にHbaseで同じ行キーを使用してレコードを取得し、受信レコードで減算を行い、最終的に新しいレコードをHBaseテーブルに保存する必要があります。Spark StreamingでHbaseデータを読み取る
最初に、newAPIHadoop
を使用してhbaseからデータを取得しようとしました。ここに私の試みは次のとおりです。
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.master", masterAddr)
conf.set("hbase.zookeeper.property.clientPort", portNum)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName)
val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
このところで、私は一度だけ特定の列の家族や列名を持つレコードの値を取得することができています。スパークストリーミングアプリケーションを起動するたびに、このコードスニペットが実行され、値を取得できますが、もう実行されません。私はカフカからレコードを受け取るたびにcfとcolumnを使ってHBaseから自分のレコードを読みたいので、これは私にとっては役に立ちません。
これを解決するにはロジックをforeachRDD()
に移動しますが、残念ながらsparkContextはシリアル化できないようです。 task is not serialzable
のようなエラーが表示されます。
最後に、hbase.clinet HTableを使用してhbaseからデータを読み取る別の方法があることが判明しました。これが私の最後の仕事である:私はforeachRDDで上記の方法を使用mainメソッドで
def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = {
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set("hbase.zookeeper.quorum", "xxxxxx")
conf.set("hbase.master", "xxxx")
conf.set("hbase.zookeeper.property.clientPort", "xxx")
conf.set(TableInputFormat.INPUT_TABLE, "xx")
conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx")
val testTable = new HTable(conf, "testTable")
val scan = new Scan
scan.addColumn("cf1".getBytes, "test".getBytes)
val rs = testTable.getScanner(scan)
var r = rs.next()
val res = new StringBuilder
while(r != null){
val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes))
res.append(tmp)
r= rs.next()
}
val res = res.toString
//do the following manipulations and return object (ImmutableBytesWritable, Put)
..............................
.......................
}
とこれが今の私のために正常に動作しますが、私は疑問を持っている方法saveAsNewAPIHadoopDataset
streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration))
を使用してのHBaseに保存しますこのプロセスについて:
このようにして、RDDのすべてのパーティションについて、HBaseへの接続が作成されると思います。私はそれが私のアプリをスケールアップすることが可能かどうか疑問に思っています。 1秒間に1000個以上のレコードがあるとしたら、スパークストリーミングでは1000個の接続が設定されているようです。
hbaseからデータを読み取る正しい方法はありますか? sparkStreamingでHBaseのデータを読み取るベストプラクティスは何ですか?または、スパークストリーミングがデータを読み込むことは想定されていません。ストリームデータをDBに書き込むように設計されています。
ありがとうございます。
私の質問にお答えいただきありがとうございます。 transferToHBasePutメソッドのパラメータとしてconfを渡して、あなたの解決策を試しました。しかし、foreachが個々のエグゼキュータのjvmプロセスで実行すると言ったように、シングルトンはドライバからワーカーに転送できません。私はそれが設定可能ではないためだと思う。最後に、foreachPartitionというメソッドがRDDで使用できることがわかりました。この方法では、RDDのパーティションごとに1回のみ接続を確立することができます。 – Frankie