次のエラーが発生します(私のアプリケーションではフォークが原因であると想定しています)、 "この結果オブジェクトは行を返しません"。forking、sqlalchemy、およびスコープのあるセッション
Traceback
---------
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 263, in execute_task
result = _execute_task(task, data)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 245, in _execute_task
return func(*args2)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/smg/analytics/services/impact_analysis.py", line 140, in _do_impact_analysis_mp
Correlation.user_id.in_(user_ids)).all())
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2241, in all
return list(self)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 65, in instances
fetch = cursor.fetchall()
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 752, in fetchall
self.cursor, self.context)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1027, in _handle_dbapi_exception
util.reraise(*exc_info)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 746, in fetchall
l = self.process_rows(self._fetchall_impl())
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 715, in _fetchall_impl
self._non_result()
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 720, in _non_result
"This result object does not return rows. "
私はDASKを使用することだし、それは(multiprocessing.Pool
を使用する)スケジューラをマルチプロセッシングています。 (ドキュメントに基づいて)私が理解しているように、スコープ付きセッションオブジェクト(scoped_session()
で作成)から作成されたセッションはスレッドセーフです。これはスレッドローカルなためです。これは、私がSession()
(またはプロキシSession
を使用して)呼び出すとき、私はそこに存在し、それが呼び出されたスレッドからのみアクセス可能なセッションオブジェクトを取得していると信じるようになります。 これはかなり簡単です。
私が混乱しているのは、なぜプロセスをフォークするときに問題があるのかということです。私はあなたが プロセス間エンジンを再使用できないことを理解するので、私は、ドキュメントからの逐語的にイベント・ベースのソリューションを踏襲し、これをやった:
class _DB(object):
_engine = None
@classmethod
def _get_engine(cls, force_new=False):
if cls._engine is None or force_new is True:
cfg = Config.get_config()
user = cfg['USER']
host = cfg['HOST']
password = cfg['PASSWORD']
database = cfg['DATABASE']
engine = create_engine(
'mysql://{}:{}@{}/{}?local_infile=1&'
'unix_socket=/var/run/mysqld/mysqld.sock'.
format(user, password, host, database),
pool_size=5, pool_recycle=3600)
cls._engine = engine
return cls._engine
# From the docs, handles multiprocessing
@event.listens_for(_DB._get_engine(), "connect")
def connect(dbapi_connection, connection_record):
connection_record.info['pid'] = os.getpid()
#From the docs, handles multiprocessing
@event.listens_for(_DB._get_engine(), "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info['pid'] != pid:
connection_record.connection = connection_proxy.connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
(connection_record.info['pid'], pid)
)
# The following is how I create the scoped session object.
Session = scoped_session(sessionmaker(
bind=_DB._get_engine(), autocommit=False, autoflush=False))
Base = declarative_base()
Base.query = Session.query_property()
(ドキュメントに基づいて)だから私の仮定次のとおりです。
はスコープのセッションオブジェクトから作成されたセッションオブジェクトを使用して、それは常に私に(私の場合だけで、子プロセスのメインスレッドになります)ThreadLocalのセッションを与える必要があります。ドキュメントにはありませんが、スコープされたセッションオブジェクトが別のプロセスで作成されたとしても、これが適用されるはずです。
ThreadLocalのセッションでは、接続は、このプロセス内で作成されていない場合、それは新しいものを作成し、エンジンを経由してプールから接続を取得します(上記
connection()
とcheckout()
の実装に基づいています。)
これらのことが両方とも真実ならば、すべてが「うまくいく」(AFAICT)でなければなりません。しかし、そうではありません。
新しいスコープのセッションオブジェクトを新しいプロセスごとに作成し、セッションを使用して以降のすべての呼び出しで を使用することで、それを動作させることができました。
BTW Base.query
属性は、この新しいスコープ付きセッションオブジェクトからも更新する必要がありました。
上記の#1の仮定が間違っていると思います。なぜ私は各プロセスで新しいスコープのセッションオブジェクトを作成する必要があるのか理解できる人はいますか?
乾杯。
あなたはフォークコードだけでなく、完全なスタックトレースを含め、最小限の例を投稿することができますか?私は、接続プールがフォークの前にDBに接続していることが疑わしいので、両方のプロセスがソケットを共有しています。 – univerio
サンプルコードをいくつか追加します。プールは、フォークが行われる前に確実にすでに接続されていますが、プールを使用する子プロセスは、使用する前に呼び出しコードのpidを調べるか、新しいコードを作成することによって処理されます(前述の 'checkout'メソッドによる)。それとも少なくともそれはAIUIの意図です。 –
'dask.multiprocessing.get'を使用するのではなく、単一のノード分散スケジューラを作成することができます。これは、よりクリーンなプロセスからのフォークであり、一般にクリーンな経験です:http://dask.pydata.org/en/latest/scheduler-choice.html – MRocklin