2016-08-12 6 views
2

私はアプリケーションを実行し、Gunicornnginxの後ろに置きます。 私のアプリケーションの初期化モジュールでは、web.run_app(app)を使用してアプリケーションを実行しませんが、Gunicornによってインポートされるインスタンスを作成して、各作業者で実行します。Gunicornが作成されます。 したがって、Gunicornは、いくつかのワーカープロセス、イベントループを作成してから、それらのループ内にアプリケーションの要求ハンドラrunsを作成します。aiohttpアプリケーションプロセスでZeroMQを聞きます

aiohttpアプリケーションは、私がGunicornによって開始されたアプリケーション・プロセスのいずれかで発生したイベントに通知したい接続WebSockets(モバイルアプリケーションクライアント)のコレクションを持っています。 そしてすべてWebSocketsに連絡し、すべてのアプリケーションプロセスに接続したいとします。 したがって、私はZeroMQを使って何らかのアップストリームプロキシを作成し、各アプリケーションプロセスからzmq.SUBソケットを使用して購読したいと考えています。

...だから基本的に私は、各アプリケーションの労働者にこのような何かを達成したい:

context = zmq.Context() 
socket = context.socket(zmq.SUB) 
socket.connect('tcp://localhost:5555') 

while True: 
    event = socket.recv() 
    for ws in app['websockets']: 
     ws.send_bytes(event) 
    # break before app shutdown. How? 

にはどうすればWebSocketsにメッセージを転送するためにaiohttpアプリケーション内ZeroMQプロキシを聞くことができますか?

イベントループ内でこのコードをバックグラウンドで実行する方法と、アプリケーションのライフサイクルであるaiohttp内で正しく実行してシャットダウンする方法を教えてください。


UPDATE

私はすでに問題を説明し、可能な解決策を提案aiohttpのGitHubのリポジトリにissueを作成しました。ここに記載された問題については、私は非常に感謝しています。

答えて

1

[OK]を、質問およびこのissue上の議論は、すなわち、バージョン1.0 に我々はApplication.on_startup()メソッドを使用してon_startupアプリケーション信号を登録する機能を持っているだろう、私はaiohttpに貢献してきた新機能につながっています。

Documentation
Working example on the master branch

#!/usr/bin/env python3 
"""Example of aiohttp.web.Application.on_startup signal handler""" 
import asyncio 

import aioredis 
from aiohttp.web import Application, WebSocketResponse, run_app 

async def websocket_handler(request): 
    ws = WebSocketResponse() 
    await ws.prepare(request) 
    request.app['websockets'].append(ws) 
    try: 
     async for msg in ws: 
      print(msg) 
      await asyncio.sleep(1) 
    finally: 
     request.app['websockets'].remove(ws) 
    return ws 


async def on_shutdown(app): 
    for ws in app['websockets']: 
     await ws.close(code=999, message='Server shutdown') 


async def listen_to_redis(app): 
    try: 
     sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop) 
     ch, *_ = await sub.subscribe('news') 
     async for msg in ch.iter(encoding='utf-8'): 
      # Forward message to all connected websockets: 
      for ws in app['websockets']: 
       ws.send_str('{}: {}'.format(ch.name, msg)) 
      print("message in {}: {}".format(ch.name, msg)) 
    except asyncio.CancelledError: 
     pass 
    finally: 
     print('Cancel Redis listener: close connection...') 
     await sub.unsubscribe(ch.name) 
     await sub.quit() 
     print('Redis connection closed.') 


async def start_background_tasks(app): 
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app)) 


async def cleanup_background_tasks(app): 
    print('cleanup background tasks...') 
    app['redis_listener'].cancel() 
    await app['redis_listener'] 


async def init(loop): 
    app = Application(loop=loop) 
    app['websockets'] = [] 
    app.router.add_get('/news', websocket_handler) 
    app.on_startup.append(start_background_tasks) 
    app.on_cleanup.append(cleanup_background_tasks) 
    app.on_shutdown.append(on_shutdown) 
    return app 

loop = asyncio.get_event_loop() 
app = loop.run_until_complete(init(loop)) 
run_app(app) 
関連する問題