2016-11-03 1 views
1

asyncioaiohttpを使用してイベントをポストするプログラムを作成しました。このプログラムは、ローカルで実行すると動作します。私は10kイベントを問題なく投稿できます。しかし、私は、リモートマシンに全体のコードベースをSCPed、そのマシンの中に、私はこのエラーを取得せずに15の以上のイベントをポストすることはできません。asynioとaiohttpを使用して、リモートマシンとローカルのポストを多数作成できない

RuntimeError: Event loop is closed 
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a53989410> 
Traceback (most recent call last): 
    File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__ 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed 
RuntimeError: Event loop is closed 
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a5397ffc0> 
Traceback (most recent call last): 
    File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__ 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed 
RuntimeError: Event loop is closed 

どのように私はこれをデバッグするか、この問題の原因を見つけることができますか?ここで

は、私が作成したクラスであり、私は実行するメソッド post()を使用します。それが閉じていたら、あなたはループを再使用することはできません

import uuid 
import os 
import asyncio 
import time 
import random 
import json 
import aiohttp 
from tracer.utils.phase import Phase 

class Poster(Phase): 
    def __init__(self, log, endpoint, num_post, topic, datafile, timeout, oracles, secure=False, thru_proxy=True): 
     Phase.__init__(self, log, "post", oracles, secure, thru_proxy) 
     self.log = log 
     self.num_post = int(num_post) 
     self.datafile = datafile.readlines() 
     self.topic = topic 
     self.endpoint = self.set_endpoint(endpoint, self.topic) 
     self.response = None 
     self.timeout = timeout 

    def random_line(self): 
     """ Returns random line from file and converts it to JSON """ 
     return json.loads(random.choice(self.datafile)) 

    @staticmethod 
    def change_uuid(event): 
     """ Creates new UUID for event_id """ 
     new_uuid = str(uuid.uuid4()) 
     event["event_header"]["event_id"] = new_uuid 
     return event 

    @staticmethod 
    def wrapevent(event): 
     """ Wrap event with metadata for analysis later on """ 
     return { 
      "tracer": { 
       "post": { 
        "statusCode": None, 
        "timestamp": None, 
       }, 
       "awsKafkaTimestamp": None, 
       "qdcKakfaTimestamp": None, 
       "hdfsTimestamp": None 
      }, 
      "event": event 
     } 

    def gen_random_event(self): 
     random_event = self.random_line() 
     event = self.change_uuid(random_event) 
     dataspec = self.wrapevent(event) 
     return dataspec 

    async def async_post_event(self, event, session): 
     async with session.post(self.endpoint, data=event, proxy=self.proxy) as resp: 
      event["tracer"]["post"]["timestamp"] = time.time() * 1000.0 
      event["tracer"]["post"]["statusCode"] = resp.status 
      unique_id = event["event"]["event_header"]["event_id"] 
      oracle_endpoint = os.path.join(self.oracle, unique_id) 
     async with session.put(oracle_endpoint, data=json.dumps(event), proxy=self.proxy) as resp: 
      if resp.status != 200: 
       self.log.debug("Post to ElasticSearch not 200") 
       self.log.debug(event["event"]["event_header"]["event_id"]) 
       self.log.debug("Status code: " + str(resp.status)) 
      return event["event"]["event_header"]["event_id"], resp.status 

    async def async_post_events(self, events): 
     coros = [] 
     conn = aiohttp.TCPConnector(verify_ssl=self.secure) 
     async with aiohttp.ClientSession(connector=conn) as session: 
      for event in events: 
       coros.append(self.async_post_event(event, session)) 
      return await asyncio.gather(*coros) 

    def post(self): 
     event_loop = asyncio.get_event_loop() 
     try: 
      events = [self.gen_random_event() for i in range(self.num_post)] 
      start_time = time.time() 
      results = event_loop.run_until_complete(self.async_post_events(events)) 
      print("Time taken: " + str(time.time() - start_time)) 
     finally: 
      event_loop.close() 

答えて

2

AbstractEventLoop.closeからドキュメント:

This is idempotent and irreversible. No other methods should be called after this one.

loop.closeの呼び出しを削除するか、各ポストのための新たなループを作成してください。

私の助言は、ループ内のすべてを実行し、必要なときにはasync_post_eventsを待ってこれらの問題を回避することです。

関連する問題