Apache Sparkで利用可能なmapPartition
を使用して、すべてのRDDのMySQL接続の初期化を防止することをお勧めします。
これは私が作成したMySQLのテーブルです:次の値を持つ
create table test2(id varchar(10), state varchar(10));
:
+------+---------+
| id | state |
+------+---------+
| 1 | success |
| 2 | stopped |
+------+---------+
参考として、次のPySparkコードを使用します。
import MySQLdb
data1=[["1", "afdasds"],["2","dfsdfada"],["3","dsfdsf"]] #sampe data, in your case streaming data
rdd = sc.parallelize(data1)
def func1(data1):
con = MySQLdb.connect(host="127.0.0.1", user="root", passwd="yourpassword", db="yourdb")
c=con.cursor()
c.execute("select * from test2;")
data=c.fetchall()
dict={}
for x in data:
dict[x[0]]=x[1]
list1=[]
for x in data1:
if x[0] in dict:
list1.append([x[0], x[1], dict[x[0]]])
else:
list1.append([x[0], x[1], "none"]) # i assign none if id in table and one received from streaming dont match
return iter(list1)
print rdd.mapPartitions(func1).filter(lambda x: "none" not in x[2]).collect()
出力こと私は持っていた:
[['1', 'afdasds', 'success'], ['2', 'dfsdfada', 'stopped']]
MySQLへの繰り返し接続を解決するには、 'mapPartition'変換に続いて' filter'を使うことをお勧めします。 – CoDhEr