私は仕事で私のニーズに合ったプロデューサー/消費者を書いています。データベースにデータ永続性を持つPythonプロデューサ/コンシューマ?
一般に、リモートサーバからログを取得してキューに入れるプロデューサスレッドがあります。キューからデータを読み込んで何らかの作業を行う1つ以上のコンシューマスレッド。その後、後で分析するために、データと結果の両方を保存する必要があります(例:sqlite3 db)。
ログの各部分を1回だけ処理できるようにするには、データを消費するたびにデータベースをクエリして完了したかどうかを確認する必要があります。私はこれを達成するためのよりよい方法があるのだろうかと思います。複数のコンシューマスレッドが存在する場合、データベースのロックが問題になるようです。関連
コード:
import Queue
import threading
import requests
out_queue = Queue.Queue()
class ProducerThread(threading.Thread):
def __init__(self, out_queue):
threading.Thread.__init__(self)
self.out_queue = out_queue
def run(self):
while True:
# Read remote log and put chunk in out_queue
resp = requests.get("http://example.com")
# place chunk into out queue and sleep for some time.
self.out_queue.put(resp)
time.sleep(10)
class ConsumerThread(threading.Thread):
def __init__(self, out_queue):
threading.Thread.__init__(self)
self.out_queue = out_queue
def run(self):
while True:
# consume the data.
chunk = self.out_queue.get()
# check whether chunk has been consumed before. query the database.
flag = query_database(chunk)
if not flag:
do_something_with(chunk)
# signals to queue job is done
self.out_queue.task_done()
# persist the data and other info insert to the database.
data_persist()
else:
print("data has been consumed before.")
def main():
# just one producer thread.
t = ProducerThread(out_queue)
t.setDaemon(True)
t.start()
for i in range(3):
ct = ConsumerThread(out_queue)
ct.setDaemon(True)
ct.start()
# wait on the queue until everything has been processed
out_queue.join()
main()
データベースの大きさによって異なります。キャッシュを使用できます。メモリまたは並列No-SQLデータベースにタプルを保持する。タプル内のチャンクのハッシュを使用し、 'in'演算子を使用してそれが既に存在するかどうかを確認します。 Redisのようなものを使うと、代わりに大きなキャッシュを永続させることができます。この場合、結果が 'None'ではない場合は' Redis.get(チャンク) 'を尋ねるだけです。 – tuned
またはタスクに適合する任意のNo-SQL。おそらくmemcachedもこの場合です。 – tuned
@tunedconsulting Hmm ...Redisは私が掘り下げているものです。しかし、私はまだRedisとディスク上のデータベースファイルとの関係について少しは困惑しています。だから、消費者は最初にRedisキャッシュに結果を保存します。しばらくしてから、キャッシュをデータベースに保存しますか?また、マルチスレッドの問題については、プロデューサ側で「チェックイン済み」プロセスが発生する必要がありますか? – dofine