2017-03-10 11 views
0

私はスパークストリーミングを使用してKafkaからデータをストリーミングし、データ判定をMySqlのデータでフィルタリングしたいと考えています。私は用語の状態を取得するためにMySQLを照会する必要がspark rdd fliter by query mysql

id | state 
1 | "success" 

例えば、私はカフカからのデータがちょうど好きを取得:

{"id":1, "data":"abcdefg"} 

をし、このようなMySQLでデータがありますid。 私は、フィルタの機能でMySqlへの接続を定義することができ、それは動作します。

しかし、それはRDDのすべての行に対して接続を定義し、多くのコンピューティングリソースを無駄にします。

フィルタの操作方法は?

+0

MySQLへの繰り返し接続を解決するには、 'mapPartition'変換に続いて' filter'を使うことをお勧めします。 – CoDhEr

答えて

1

Apache Sparkで利用可能なmapPartitionを使用して、すべてのRDDのMySQL接続の初期化を防止することをお勧めします。

これは私が作成したMySQLのテーブルです:次の値を持つ

create table test2(id varchar(10), state varchar(10)); 

+------+---------+ 
| id | state | 
+------+---------+ 
| 1 | success | 
| 2 | stopped | 
+------+---------+ 

参考として、次のPySparkコードを使用します。

import MySQLdb 

data1=[["1", "afdasds"],["2","dfsdfada"],["3","dsfdsf"]] #sampe data, in your case streaming data 
rdd = sc.parallelize(data1) 

def func1(data1): 
    con = MySQLdb.connect(host="127.0.0.1", user="root", passwd="yourpassword", db="yourdb") 
    c=con.cursor() 
    c.execute("select * from test2;") 
    data=c.fetchall() 
    dict={} 
    for x in data: 
     dict[x[0]]=x[1] 
    list1=[] 
    for x in data1: 
     if x[0] in dict: 
      list1.append([x[0], x[1], dict[x[0]]]) 
     else: 
      list1.append([x[0], x[1], "none"]) # i assign none if id in table and one received from streaming dont match 
    return iter(list1) 

print rdd.mapPartitions(func1).filter(lambda x: "none" not in x[2]).collect() 

出力こと私は持っていた:

[['1', 'afdasds', 'success'], ['2', 'dfsdfada', 'stopped']] 
+0

ありがとう!試してみます – xiang