2016-04-10 16 views
0

私は仕事で私のニーズに合ったプロデューサー/消費者を書いています。データベースにデータ永続性を持つPythonプロデューサ/コンシューマ?

一般に、リモートサーバからログを取得してキューに入れるプロデューサスレッドがあります。キューからデータを読み込んで何らかの作業を行う1つ以上のコンシューマスレッド。その後、後で分析するために、データと結果の両方を保存する必要があります(例:sqlite3 db)。

ログの各部分を1回だけ処理できるようにするには、データを消費するたびにデータベースをクエリして完了したかどうかを確認する必要があります。私はこれを達成するためのよりよい方法があるのだろうかと思います。複数のコンシューマスレッドが存在する場合、データベースのロックが問題になるようです。関連

コード:

import Queue 
import threading 
import requests 

out_queue = Queue.Queue() 


class ProducerThread(threading.Thread): 
    def __init__(self, out_queue): 
     threading.Thread.__init__(self) 
     self.out_queue = out_queue 

    def run(self): 
     while True: 
      # Read remote log and put chunk in out_queue 
      resp = requests.get("http://example.com") 

      # place chunk into out queue and sleep for some time. 
      self.out_queue.put(resp) 
      time.sleep(10) 


class ConsumerThread(threading.Thread): 
    def __init__(self, out_queue): 
     threading.Thread.__init__(self) 
     self.out_queue = out_queue 

    def run(self): 
     while True: 
      # consume the data. 
      chunk = self.out_queue.get() 

      # check whether chunk has been consumed before. query the database. 
      flag = query_database(chunk) 
      if not flag: 
       do_something_with(chunk) 

       # signals to queue job is done 
       self.out_queue.task_done() 

       # persist the data and other info insert to the database. 
       data_persist() 
      else: 
       print("data has been consumed before.") 


def main(): 

    # just one producer thread. 
    t = ProducerThread(out_queue) 
    t.setDaemon(True) 
    t.start() 

    for i in range(3): 
     ct = ConsumerThread(out_queue) 
     ct.setDaemon(True) 
     ct.start() 

    # wait on the queue until everything has been processed 
    out_queue.join() 

main() 
+0

データベースの大きさによって異なります。キャッシュを使用できます。メモリまたは並列No-SQLデータベースにタプルを保持する。タプル内のチャンクのハッシュを使用し、 'in'演算子を使用してそれが既に存在するかどうかを確認します。 Redisのようなものを使うと、代わりに大きなキャッシュを永続させることができます。この場合、結果が 'None'ではない場合は' Redis.get(チャンク) 'を尋ねるだけです。 – tuned

+0

またはタスクに適合する任意のNo-SQL。おそらくmemcachedもこの場合です。 – tuned

+0

@tunedconsulting Hmm ...Redisは私が掘り下げているものです。しかし、私はまだRedisとディスク上のデータベースファイルとの関係について少しは困惑しています。だから、消費者は最初にRedisキャッシュに結果を保存します。しばらくしてから、キャッシュをデータベースに保存しますか?また、マルチスレッドの問題については、プロデューサ側で「チェックイン済み」プロセスが発生する必要がありますか? – dofine

答えて

1

ログには、リモートサーバーを読めば繰り返し/重複していない場合、ログが複数回処理されたかどうかを確認する必要がQueue class implements all the required locking semanticsとして、存在しないため、Queue.get( )は、特定のアイテムが1つのConsumerThreadによってのみ取得できることを保証します。

ログが重複する可能性がある場合は、ConsumerThreadをチェックするのではなく、ProducerThread(ログをキューに追加する前)でチェックする必要があります。このようにして、ロックを考慮する必要はありません。以下で要件についての私の理解した上での@ dofineの確認に基づいて

更新がコメント:

ポイント#2、#3のために、あなたはqueuelibでFifoDiskQueueなど軽量永続キューが必要な場合があります。正直言って、私は前にこのライブラリを使用していないが、私はあなたのために動作するはずだと思う。 libをチェックしてください。ポイント#1の

、私はあなたがFifoDiskQueueの別のキューとの組み合わせで、どんな(非メモリー)データベースを使用してそれを達成することができますね。

  • 第二キューが再キューイングの目的を果たします1つのコンシューマスレッドで処理できない場合はすぐにログに記録します。アイデアについては、私の最初のコメントをご覧ください。
  • データベースには1つのテーブルがあります。プロデューサスレッドは常に新しいレコードを追加しますが、レコードを更新することはありません。コンシューマスレッドは、キューから取り出したレコードを更新するだけです。上記のロジックを持つキュー
  • を更新すると、アプリケーションの起動時(コンシューマを起動する前に)にテーブル
  • をロックする必要はありません。アプリケーションの予期しない終了

このアップデートは、モバイルで入力されたSO、それは一種の不便、それを拡張することであるにトラックに「失われた」されているそれらのログのデシベル。必要に応じて、私がチャンスを得たときに再び更新します。

+0

ありがとう!ログから必要なデータを複製することができます。ユーザーは1日に数回ログインすることができましたが、1日に1回彼にメッセージを送るだけで済みます。だから私はProducerのチェックをしてから、このユーザがプッシュメッセージを受け取った場所を確認してください。 – dofine

+0

上記のように、ProducerThreadでdictを使用することで簡単にチェックできます。しかし、ユーザーがプッシュメッセージを受信したかどうかを決して判断することはできません(応答がない限り)。代わりに、あなたがメッセージを正常に送信したかどうかを知ることができます.CentumerThreadで処理できないチャンクを保存する別のキューを確立し、この新しいキューのプロデューサをConsumerThreadにし、 ProducerThreadを単一のコンシューマにする(チャンクを最初のキューに戻す)。 –

+0

sendメッセージ部分は、応答を返す別のAPI呼び出しによって提供されます。それは問題ではないはずです。しかし、データベースとマルチスレッドに関しては初心者であることを私に許してください。 :)チェックを行うには辞書を使うのは簡単です。 db-save部分はどうですか?いつ私はそれをするべきですか?私が正しいことを理解すれば、私は実際のデータベースの「キャッシュ」としてdict(または何でも、redis)を使用できますか? – dofine

関連する問題