2016-09-29 24 views
3

私はKafkaからデータを受け取り、Hbaseテーブルに書き込むプロジェクトを作成しています。レコードの差分を知りたいので、最初にHbaseで同じ行キーを使用してレコードを取得し、受信レコードで減算を行い、最終的に新しいレコードをHBaseテーブルに保存する必要があります。Spark StreamingでHbaseデータを読み取る

最初に、newAPIHadoopを使用してhbaseからデータを取得しようとしました。ここに私の試みは次のとおりです。

val conf = HBaseConfiguration.create() 
conf.set("zookeeper.znode.parent", "/hbase-secure") 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) 
conf.set("hbase.zookeeper.quorum", zkQuorum) 
conf.set("hbase.master", masterAddr) 
conf.set("hbase.zookeeper.property.clientPort", portNum) 
conf.set(TableInputFormat.INPUT_TABLE, tableName) 
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName) 

val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf, 
     classOf[TableInputFormat], 
     classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
     classOf[org.apache.hadoop.hbase.client.Result]) 

このところで、私は一度だけ特定の列の家族や列名を持つレコードの値を取得することができています。スパークストリーミングアプリケーションを起動するたびに、このコードスニペットが実行され、値を取得できますが、もう実行されません。私はカフカからレコードを受け取るたびにcfとcolumnを使ってHBaseから自分のレコードを読みたいので、これは私にとっては役に立ちません。

これを解決するにはロジックをforeachRDD()に移動しますが、残念ながらsparkContextはシリアル化できないようです。 task is not serialzableのようなエラーが表示されます。

最後に、hbase.clinet HTableを使用してhbaseからデータを読み取る別の方法があることが判明しました。これが私の最後の仕事である:私はforeachRDDで上記の方法を使用mainメソッドで

def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = { 
    val conf = HBaseConfiguration.create() 
    conf.set("zookeeper.znode.parent", "/hbase-secure") 
    conf.set("hbase.zookeeper.quorum", "xxxxxx") 
    conf.set("hbase.master", "xxxx") 
    conf.set("hbase.zookeeper.property.clientPort", "xxx") 
    conf.set(TableInputFormat.INPUT_TABLE, "xx") 
    conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx") 

    val testTable = new HTable(conf, "testTable") 
    val scan = new Scan 
    scan.addColumn("cf1".getBytes, "test".getBytes) 
    val rs = testTable.getScanner(scan) 

    var r = rs.next() 
    val res = new StringBuilder 
    while(r != null){ 
     val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes)) 

     res.append(tmp) 
     r= rs.next() 
    } 
val res = res.toString 

//do the following manipulations and return object (ImmutableBytesWritable, Put) 
     .............................. 
     ....................... 
      } 

とこれが今の私のために正常に動作しますが、私は疑問を持っている方法saveAsNewAPIHadoopDataset

streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration)) 

を使用してのHBaseに保存しますこのプロセスについて:

このようにして、RDDのすべてのパーティションについて、HBaseへの接続が作成されると思います。私はそれが私のアプリをスケールアップすることが可能かどうか疑問に思っています。 1秒間に1000個以上のレコードがあるとしたら、スパークストリーミングでは1000個の接続が設定されているようです。

hbaseからデータを読み取る正しい方法はありますか? sparkStreamingでHBaseのデータを読み取るベストプラクティスは何ですか?または、スパークストリーミングがデータを読み込むことは想定されていません。ストリームデータをDBに書き込むように設計されています。

ありがとうございます。

答えて

0

foreachRDDは、個々のエグゼキュータのjvmプロセスで実行されます。少なくとも、confのsingletonインスタンス(jvmプロセスまたは新しいconfの既存のset confを使用する前にnullチェックを持つことを意味します)をtransferToHBasePutメソッドで取得できます。これによりSparkクラスタで生成されたエグゼキュータの数とHbaseの接続数が減少します。このことができます

希望...

+0

私の質問にお答えいただきありがとうございます。 transferToHBasePutメソッドのパラメータとしてconfを渡して、あなたの解決策を試しました。しかし、foreachが個々のエグゼキュータのjvmプロセスで実行すると言ったように、シングルトンはドライバからワーカーに転送できません。私はそれが設定可能ではないためだと思う。最後に、foreachPartitionというメソッドがRDDで使用できることがわかりました。この方法では、RDDのパーティションごとに1回のみ接続を確立することができます。 – Frankie

3

いくつか学習した後、私はRDDの各パーティションの設定を作成します。 foreachRDDのデザインパターンをSpark Streaming official websiteにチェックしてください。実際に設定は接続ではないので、Hbaseのレコードを取得して置くために既存の接続プールから接続を取得する方法はわかりません。

+0

スパークストリーミングでHBaseを読むことはできましたか?私は各データの接続を開くだけでそれを読むことができます。それを行う方法は何ですか? – zorkaya

関連する問題