4

私は誰かが私を助けてくれることを望んでいます。Python 3.5 asyncioは、異なるスレッドの同期コードからイベントループでcoroutineを実行します。

私はコルーチンオブジェクトを返す属性を持つオブジェクトを持っています。これはきれいに動作しますが、イベントループが現在実行されている間に、独立したスレッドで同期コードからコルーチンオブジェクトの結果を取得する必要がある状況があります。私が思いついたコードは次のとおりです。

def get_sync(self, key: str, default: typing.Any=None) -> typing.Any: 
    """ 
    Get an attribute synchronously and safely. 

    Note: 
     This does nothing special if an attribute is synchronous. It only 
     really has a use for asynchronous attributes. It processes 
     asynchronous attributes synchronously, blocking everything until 
     the attribute is processed. This helps when running SQL code that 
     cannot run asynchronously in coroutines. 

    Args: 
     key (str): The Config object's attribute name, as a string. 
     default (Any): The value to use if the Config object does not have 
      the given attribute. Defaults to None. 

    Returns: 
     Any: The vale of the Config object's attribute, or the default 
     value if the Config object does not have the given attribute. 
    """ 
    ret = self.get(key, default) 

    if asyncio.iscoroutine(ret): 
     if loop.is_running(): 
      loop2 = asyncio.new_event_loop() 
      try: 
       ret = loop2.run_until_complete(ret) 

      finally: 
       loop2.close() 
     else: 
      ret = loop.run_until_complete(ret) 

    return ret 

私が探しています何同期マルチスレッド環境でコルーチンオブジェクトの結果を得るために安全な方法です。 self.get()は、それらを提供するように設定した属性に対して、コルーチンオブジェクトを返すことができます。私が見つけた問題は、イベントループが実行中であるかどうかです。スタックのオーバーフローや他のいくつかのサイトで数時間を検索した後、私の(壊れた)解決策は上記のとおりです。ループが実行されている場合は、新しいイベントループを作成し、新しいイベントループでコルーチンを実行します。これは、コードがret = loop2.run_until_complete(ret)行に永久にハングすることを除いて動作します。

今、私は結果を次のシナリオがあります。self.get()

  1. 結果は
    • 結果を返しますコルーチンではありません。 [良い] self.get()
  2. 結果は、コルーチン&イベントループは
    • 戻り結果(基本的イベントループと同じスレッドで)実行されていませんです。 は[良い] self.get()
  3. 結果は、コルーチン&イベントループは
    • ハングは永遠の結果を待っている(基本的イベントループとは別のスレッドで)実行されています。 [悪い]

は私が必要な値を得ることができるので、私は悪い結果を修正については行くことができますどのように誰もが知っていますか?ありがとう。

私はここで何か感謝したいと思います。

私はスレッドを使用するのに有効かつ有効な理由があります。具体的には、非同期ではないSQLAlchemyを使用しており、SQLAlchemyコードをThreadPoolExecutorにパントして安全に処理します。しかし、これらのスレッド内からこれらの非同期属性をクエリして、SQLAlchemyコードで特定の設定値を安全に取得できるようにする必要があります。そして、いいえ、私は必要なものを達成するためにSQLAlchemyから別のシステムに切り替えるつもりはありませんので、代替案を提示しないでください。プロジェクトはあまりにも基本的なものをそれに転換するためには遠すぎます。

asyncio.run_coroutine_threadsafe()loop.call_soon_threadsafe()を使用してみましたが、どちらも失敗しました。これまでのところ、これは動作させるために最も遠くになった、私はちょうど何かが明らかに欠けているように感じる。

私はチャンスがあるときに、問題の例を示すコードを書きます。

いいえ、私は事例を実装しましたが、それは私の予想通りに機能しました。だから私の問題はコードのどこかにある可能性が高いです。これを開いたままにして、私の本当の問題に合うように質問を変えます。

asyncio.run_coroutine_threadsafe()が結果を返すのではなく永久にハングする理由について考えられる人がいますか?

は私のエラーと重複しないことを私のコード例は、残念ながら、以下の通りです:

import asyncio 
import typing 

loop = asyncio.get_event_loop() 

class ConfigSimpleAttr: 
    __slots__ = ('value', '_is_async') 

    def __init__(
     self, 
     value: typing.Any, 
     is_async: bool=False 
    ): 
     self.value = value 
     self._is_async = is_async 

    async def _get_async(self): 
     return self.value 

    def __get__(self, inst, cls): 
     if self._is_async and loop.is_running(): 
      return self._get_async() 
     else: 
      return self.value 

class BaseConfig: 
    __slots__ =() 

    attr1 = ConfigSimpleAttr(10, True) 
    attr2 = ConfigSimpleAttr(20, True)  

    def get(self, key: str, default: typing.Any=None) -> typing.Any: 
     return getattr(self, key, default) 

    def get_sync(self, key: str, default: typing.Any=None) -> typing.Any: 
     ret = self.get(key, default) 

     if asyncio.iscoroutine(ret): 
      if loop.is_running(): 
       fut = asyncio.run_coroutine_threadsafe(ret, loop) 
       print(fut, fut.running()) 
       ret = fut.result() 
      else: 
       ret = loop.run_until_complete(ret) 

     return ret 

config = BaseConfig() 

def example_func(): 
    return config.get_sync('attr1') 

async def main(): 
    a1 = await loop.run_in_executor(None, example_func) 
    a2 = await config.attr2 
    val = a1 + a2 
    print('{a1} + {a2} = {val}'.format(a1=a1, a2=a2, val=val)) 
    return val 

loop.run_until_complete(main()) 

これは私のコードがやっている正確に何のストリップダウンバージョンであり、そして例は動作しますが、でも、私の場合実際のアプリケーションはそうではありません。私はどこで答えを探すべきかまで詰まっている。上記のコードが実際に問題を再現していない場合でも、私の "いつまでも立ち往生"問題をどこまで追跡しようとするのが歓迎です。

答えて

0

[OK]を私は、別のアプローチを取ることによって、私のコードを作業して得た。問題はファイルI/Oコンポーネントのloop.run_in_executor()を使用してコルーチンに変換していたファイルI/Oを使用していました。次に、別のスレッドから呼び出されているsync関数でこれを使用しようとしていました。その関数で別のloop.run_in_executor()を使って処理しました。これは私のコードでは非常に重要なルーチンです(私の短期実行コードの実行中にはおそらく何百万回も呼ばれています)。私のロジックはちょっと複雑すぎるという決定を下しました。それで...私はそれを複雑にしなかった。今、ファイルIOコンポーネントを非同期的に使用したい場合は、明示的に "get_async()"メソッドを使用します。それ以外の場合は、通常の属性アクセスで自分の属性を使用します。

ロジックの複雑さを取り除くことで、コードがより洗練され、理解しやすくなりました。さらに重要なことに、実際には機能します。私は問題の根本的な原因を知っていることを100%確信していませんが(属性を処理しているスレッドと何か関係があると考えていますが、処理を開始する前に属性を読み取ろうとする別のスレッドを開始します。競合状態のようなものが発生し、コードが停止することがありましたが、残念ながら完全に証明するためにアプリケーションの外部にエラーを複製することはできませんでした)、私はそれを過ぎて開発に取り組みました。

1

は、あなたがいくつかのイベントが同時にループを実行する必要があるということはほとんどありませんので、この部分は非常に間違っているになります。

if loop.is_running(): 
     loop2 = asyncio.new_event_loop() 
     try: 
      ret = loop2.run_until_complete(ret) 

     finally: 
      loop2.close() 
    else: 
     ret = loop.run_until_complete(ret) 

でもループはしていないようです実行しているかどうかテストします正しいアプローチ。それはget_syncに明示的に(のみ)、実行中のループを与えるとrun_coroutine_threadsafeを使用してコルーチンをスケジュールするために、おそらく良いでしょう:

def get_sync(self, key, loop): 
    ret = self.get(key, default) 
    if not asyncio.iscoroutine(ret): 
     return ret 
    future = asyncio.run_coroutine_threadsafe(ret, loop) 
    return future.result() 

EDIT:ハンギングの問題は間違ったループにスケジュールされたタスクに関連することができます(たとえば忘れコルーチンを呼び出す場合はオプションのloop引数)。この種の問題は、PR 303(現在はマージ)でデバッグしやすくする必要があります。ループと未来が一致しない場合はRuntimeErrorが代わりに生成されます。だから、あなたは最新バージョンのasyncioでテストを実行したいかもしれません。

+0

ええ、まったく同じ問題が発生しました。私は今、それに対していろいろなアプローチをとることを計画しています。私は問題をよりよく理解したと思うし、もっと良い例があることを望んでいる。エラーを再現できれば素晴らしいだろう。問題は、何百回も実行されてしまうことであり、少し時間がかかります。 –

+0

@CliffHill編集が役立つかどうかを確認してください。 – Vincent

+0

ありがとうございます。しかし、問題が発生した元のコードを作り直してしまいました。少なくとも私の元のアイデアは正しい道を辿っているという検証があります。 –

関連する問題