2016-08-02 14 views
0

HBaseテーブルにいくつかのマスタ情報があるストリームデータがあります。すべての行について、私はHBaseマスタテーブルを参照して、いくつかのプロファイル情報を取得する必要があります。私のコードは次のようなものですforeach内のSparkストリーミングフィルタ条件 - NullPointerException

val con    = new setContext(hadoopHome,sparkMaster) 
val l_sparkcontext = con.getSparkContext 
val l_hivecontext = con.getHiveContext 

val topicname  = "events" 
val ssc    = new StreamingContext(l_sparkcontext, Seconds(30)) 
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10)) 
println("Kafka Stream for receiving Events..") 

val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile") 
profile_data.foreach(println) 
val tabBC = l_sparkcontext.broadcast(profile_data) 

eventsStream.foreachRDD(rdd => { 
    rdd.foreach(record => { 
    val subs_profile_rows = tabBC.value 
    val Rows = record._2.split(rowDelim) 
    Rows.foreach(row => { 
     val values = row.split(colDelim) 
     val riid = values(1).toInt 
     val cond = "riid = " + riid 
     println("Condition : ", cond) 
     val enriched_events = subs_profile_rows.filter(cond) 
    }) // End of Rows 
    }) // End of RDD 
}) // End of Events Stream 

残念ながら、私はいつもフィルターのNPEに当たっています。私はワーカーノードを越えて値をブロードキャストするためにここではほとんど質問と回答に従っていませんでしたが、何も役立たないものです。誰か助けてください。

よろしく

バラ

+0

シリアル化できない値を使用しているかどうかを確認してください。 – cchantep

+0

profile_dataをforeach内で作成しなければならないかどうかはわかりませんが、それはシリアル化できないものです。 –

答えて

0

あなたのコンテキストの使用状況は、魚のビットに見える...私には、次の2つの別々のコンテキスト(1つのスパーク、スパーク・ストリーミングのためのもの)を作成し、しようとしているように見えますそれらのコンテキスト間でブロードキャスト変数を共有します(これは動作しません)。

私たちがこれと似たようなコードを書いています。興味のある場合に備えて、Splice Machine(オープンソース)でどのように行ったかを示す動画があります。コードを見つけようとするか、他の人にあなたのために投稿させてもらいます。

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-part/

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-ii/

幸運。

+0

Johnに感謝します。私はビデオを調べます。要件は、DStreamから出力されるデータのHBaseテーブルからプロファイル情報を読み取ることです。私はforEachPartitionも持っています(変更されたコードを次のコメントとして投稿します)が、それは私に異なるエラーを与えます。あなたがそれを得ることができるなら、私はコードを待つでしょう。助けをありがとうございます –

+0

スペースの制限のため、私は2つの投稿に自分のコードを分割する必要があります。 - 開始クラスsetContext(argHadoopHome:String、argSparkMaster:String){ System.setProperty( "hadoop.home.dir"、argHadoopHome) val conf = new SparkConf()。setMaster(argSparkMaster); conf.setAppName( "Evts");配列:(CONF) プライベートヴァルl_hiveContext =新しいHiveContext(l_valSparkContext) DEF getSparkContext = l_valSparkContext DEF getHiveContext = l_hiveContext DEF getconfContext = CONF } –

+0

オブジェクトreceiveEvents { デフメイン(引数 プライベートヴァルl_valSparkContext =新しいSparkContext [String]):ユニット= { var rD = "\ r \ n" var cD = "、" var sM = "spark:// nm2:7077" var ip = "nm2:2181" var hadoopHome = "/ home/.." val con =新しいsetContext(ip、sM) val l_sparkcontext = (ssc、 "nm2:2181"、 "rcv"、map(topicname.toString);val topicname = "evt" val ssc =新しいStreamingContext(l_sparkcontext、Seconds(9)) val eventsStream \t = KafkaUtils.createStream - > 2)) val profile_data = w_hivecontext.sql( "性別、所得、年齢をhb_cust_proから選択") –