2017-02-09 7 views
2

次のエラーが発生します(私のアプリケーションではフォークが原因であると想定しています)、 "この結果オブジェクトは行を返しません"。forking、sqlalchemy、およびスコープのあるセッション

Traceback 
--------- 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 263, in execute_task 
result = _execute_task(task, data) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 245, in _execute_task 
return func(*args2) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/smg/analytics/services/impact_analysis.py", line 140, in _do_impact_analysis_mp 
Correlation.user_id.in_(user_ids)).all()) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2241, in all 
return list(self) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 65, in instances 
fetch = cursor.fetchall() 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 752, in fetchall 
self.cursor, self.context) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1027, in _handle_dbapi_exception 
util.reraise(*exc_info) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 746, in fetchall 
l = self.process_rows(self._fetchall_impl()) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 715, in _fetchall_impl 
self._non_result() 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 720, in _non_result 
"This result object does not return rows. " 

私はDASKを使用することだし、それは(multiprocessing.Poolを使用する)スケジューラをマルチプロセッシングています。 (ドキュメントに基づいて)私が理解しているように、スコープ付きセッションオブジェクト(scoped_session()で作成)から作成されたセッションはスレッドセーフです。これはスレッドローカルなためです。これは、私がSession()(またはプロキシSessionを使用して)呼び出すとき、私はそこに存在し、それが呼び出されたスレッドからのみアクセス可能なセッションオブジェクトを取得していると信じるようになります。 これはかなり簡単です。

私が混乱しているのは、なぜプロセスをフォークするときに問題があるのか​​ということです。私はあなたが プロセス間エンジンを再使用できないことを理解するので、私は、ドキュメントからの逐語的にイベント・ベースのソリューションを踏襲し、これをやった:

class _DB(object): 

    _engine = None 

    @classmethod 
    def _get_engine(cls, force_new=False): 
     if cls._engine is None or force_new is True: 
      cfg = Config.get_config() 
      user = cfg['USER'] 
      host = cfg['HOST'] 
      password = cfg['PASSWORD'] 
      database = cfg['DATABASE'] 
      engine = create_engine(
       'mysql://{}:{}@{}/{}?local_infile=1&' 
       'unix_socket=/var/run/mysqld/mysqld.sock'. 
        format(user, password, host, database), 
       pool_size=5, pool_recycle=3600) 
      cls._engine = engine 
     return cls._engine 



# From the docs, handles multiprocessing 
@event.listens_for(_DB._get_engine(), "connect") 
def connect(dbapi_connection, connection_record): 
    connection_record.info['pid'] = os.getpid() 

#From the docs, handles multiprocessing 
@event.listens_for(_DB._get_engine(), "checkout") 
def checkout(dbapi_connection, connection_record, connection_proxy): 
    pid = os.getpid() 
    if connection_record.info['pid'] != pid: 
     connection_record.connection = connection_proxy.connection = None 
     raise exc.DisconnectionError(
      "Connection record belongs to pid %s, " 
      "attempting to check out in pid %s" % 
      (connection_record.info['pid'], pid) 
     ) 


# The following is how I create the scoped session object. 

Session = scoped_session(sessionmaker(
    bind=_DB._get_engine(), autocommit=False, autoflush=False)) 

Base = declarative_base() 
Base.query = Session.query_property() 

(ドキュメントに基づいて)だから私の仮定次のとおりです。

  1. はスコープのセッションオブジェクトから作成されたセッションオブジェクトを使用して、それは常に私に(私の場合だけで、子プロセスのメインスレッドになります)ThreadLocalのセッションを与える必要があります。ドキュメントにはありませんが、スコープされたセッションオブジェクトが別のプロセスで作成されたとしても、これが適用されるはずです。

  2. ThreadLocalのセッションでは、接続は、このプロセス内で作成されていない場合、それは新しいものを作成し、エンジンを経由してプールから接続を取得します(上記connection()checkout()の実装に基づいています。)

これらのことが両方とも真実ならば、すべてが「うまくいく」(AFAICT)でなければなりません。しかし、そうではありません。

新しいスコープのセッションオブジェクトを新しいプロセスごとに作成し、セッションを使用して以降のすべての呼び出しで を使用することで、それを動作させることができました。

BTW Base.query属性は、この新しいスコープ付きセッションオブジェクトからも更新する必要がありました。

上記の#1の仮定が間違っていると思います。なぜ私は各プロセスで新しいスコープのセッションオブジェクトを作成する必要があるのか​​理解できる人はいますか?

乾杯。

+0

あなたはフォークコードだけでなく、完全なスタックトレースを含め、最小限の例を投稿することができますか?私は、接続プールがフォークの前にDBに接続していることが疑わしいので、両方のプロセスがソケットを共有しています。 – univerio

+0

サンプルコードをいくつか追加します。プールは、フォークが行われる前に確実にすでに接続されていますが、プールを使用する子プロセスは、使用する前に呼び出しコードのpidを調べるか、新しいコードを作成することによって処理されます(前述の 'checkout'メソッドによる)。それとも少なくともそれはAIUIの意図です。 –

+0

'dask.multiprocessing.get'を使用するのではなく、単一のノード分散スケジューラを作成することができます。これは、よりクリーンなプロセスからのフォークであり、一般にクリーンな経験です:http://dask.pydata.org/en/latest/scheduler-choice.html – MRocklin

答えて

0

フォークが発生したときは明確ではありませんが、fork_size = 5でデータベースへのTCP接続を初期化するフォークの前にエンジンが作成され、新しいプロセスにコピーされます。複数のプロセスが同じ物理ソケットと対話する=>問題が発生します。

  • プールを無効にし、オンデマンド接続に使用:sqla_engine:poolclassは= NullPool
  • フォーク後にプールを再作成

    オプションです。dispose()

  • 遅延フォーク後までcreate_engine
+0

エンジンが作成された後は間違いなくフォークされていますが、カスタムの 'checkout()'メソッドの1つのプールを使用して複数のプロセスを処理します。私は、それがダスクがプロセスをフォークする方法と関係があると考えています。 –

関連する問題