2012-03-08 83 views
21

scoped_sessionを使用してセッションオブジェクトを構築し、返されたセッションを使用すると、データベースセッションを効率的に正しく開いたり閉じたりする方法がわかりません。オブジェクトを作成するにはスレッドセーフなので、基本的にはすべてのスレッドが独自のセッションを取得し、問題は発生しません。今、以下のサンプルが動作します。セッションを正しく閉じているかどうかを調べるために無限ループに入れています。正しく監視した場合(「SHOW PROCESSLIST;」を実行してmysqlで)、コネクションは成長し続けます。私はsession.close()を使っていましたが、各実行の終わりにscoped_sessionオブジェクトを削除することさえできました。私は間違って何をしていますか?より大きいアプリケーションで私の目標は、必要な最小限のデータベース接続数を使用することです。現在の実装では、必要なすべてのメソッドに新しいセッションを作成して戻す前に閉じます。SQLAlchemyマルチスレッドアプリケーションでの適切なセッション処理

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker, scoped_session 
from threading import Thread 
from Queue import Queue, Empty as QueueEmpty 
from models import MyModel 


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
     self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
     self.DBSession = scoped_session(
      sessionmaker(
       autoflush=True, 
       autocommit=False, 
       bind=self.db_engine 
      ) 
     ) 

    def _worker(self): 
     db_session = self.DBSession() 
     while True: 
      try: 
       task_id = self.task_queue.get(False) 
       try: 
        item = db_session.query(MyModel).filter(MyModel.id == task_id).one() 
        # do something with item 
       except Exception as exc: 
        # if an error occurrs we skip it 
        continue 

       finally: 
        db_session.commit() 
        self.task_queue.task_done() 
      except QueueEmpty: 
       db_session.close() 
       return 

    def start(self): 
     try: 
      db_session = self.DBSession() 
      all_items = db_session.query(MyModel).all() 
      for item in all_items: 
       self.task_queue.put(item.id) 

      for _i in range(self.worker_count): 
       t = Thread(target=self._worker) 
       t.start() 

      self.task_queue.join() 
     finally: 
      db_session.close() 
      self.DBSession.remove() 


if __name__ == '__main__': 
    while True: 
     mt_worker = MTWorker(worker_count=50) 
     mt_worker.start() 

答えて

36

だけです(データベースあたり) プロセスごとに一度create_enginescoped_sessionを呼び出す必要があります。それぞれは独自の接続プールまたはセッションプールを取得します( )ので、と1つのプールを作成していることを確認してください。モジュールレベルのグローバルにしてください。あなたはより多くのprecieslyそれよりも、あなたのセッションを管理する必要がある場合、あなたはおそらく作るためにscoped_session

もう一つの変化を利用してしてはならないことは、それは セッションであるかのように直接DBSessionを使用することです。 scoped_sessionでセッションメソッドを呼び出すと、透過的に スレッドローカルセッションを作成し、メソッド呼び出しを セッションに転送します。

は、デフォルトでは5で接続プールの pool_size で注意すべきもう一つ。多くのアプリケーションでは罰金だということが、あなたは、スレッドの 多くを作成している場合、あなたはそれが確かに非常に役立ちました、パラメータ

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
DBSession = scoped_session(
    sessionmaker(
     autoflush=True, 
     autocommit=False, 
     bind=db_engine 
    ) 
) 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
# snip 
+1

、情報ありがとうございことをチューニングする必要がある場合があります。キングは尊敬する! – andrean

関連する問題