2016-12-26 22 views
1

私は、APSchedulerジョブが起動していないPythonコードをいくつか持っています。コンテキストとして、私はまた、マルチスレッドを行うためにeventlet/GreenPoolを使用してファイルの変更のためのディレクトリを探しているハンドラも持っています。いくつかのトラブルシューティングに基づいて、APSchedulerとeventletの間に何らかの矛盾があるようです。APSchedulerタスクがイベントレットのために起動しないmonkey_patch

次のように私の出力はなります

2016年12月26日2時30分30秒UTC(0000):完成ダウンロードは
2016年12月26日2時46分07秒UTCを渡します(+ 0000):制御Cまたは他の終了信号による終了
Jobstoreデフォルト:
時間起動ダウンロード(トリガー:間隔[0:05:00]、次回実行時:2016-12-25 18:35:00) PST) 2016-12-26 02:46:07 UTC(+0000):1

(18:35 PST = 2:35 UTC)...ので、私はeventletがで含まれて一緒にprocessAllQueues機能のほとんどをコメントアウトした場合、私はコントロール-C

from apscheduler import events ## pip install apscheduler 
from apscheduler.schedulers.background import BackgroundScheduler 

# Threading 
from eventlet import patcher, GreenPool ## pip install eventlet 
patcher.monkey_patch(all = True) 

def setSchedule(scheduler, cfg, minutes = 60*2, hours = 0): 
    """Set up the schedule of how frequently a download should be attempted. 
    scheduler object must already be declared. 
    will accept either minutes or hours for the period between downloads""" 
    if hours > 0: 
    minutes = 60*hours if minutes == 60 else 60*hours+minutes 
    handle = scheduler.add_job(processAllQueues, 
          trigger='interval', 
          kwargs={'cfg': cfg}, 
          id='RQmain', 
          name='Time-Activated Download', 
          coalesce=True, 
          max_instances=1, 
          minutes=minutes, 
          start_date=dt.datetime.strptime('2016-10-10 00:15:00', '%Y-%m-%d %H:%M:%S') # computer's local time 
) 
    return handle 

def processAllQueues(cfg): 
    SQSpool = GreenPool(size=int(cfg.get('GLOBAL','Max_AWS_Connections'))) 
    FHpool = GreenPool(size=int(cfg.get('GLOBAL','Max_Raw_File_Process'))) 
    arSects = [] 
    dGlobal = dict(cfg.items('GLOBAL')) 
    for sect in filter(lambda x: iz.notEqualz(x,'GLOBAL','RUNTIME'),cfg.sections()): 
    dSect = dict(cfg.items(sect)) # changes all key names to lowercase 
    n = dSect['sqs_queue_name'] 
    nn = dSect['node_name'] 
    fnbase = "{}_{}".format(nn,n) 
    dSect["no_ext_file_name"] = os.path.normpath(os.path.join(cfg.get('RUNTIME','Data_Directory'),fnbase)) 
    arSects.append(mergeTwoDicts(dGlobal,dSect)) # section overrides global 
    arRes = [] 
    for (que_data,spec_section) in SQSpool.imap(doQueueDownload,arSects): 
    if que_data: fileResult = FHpool.spawn(outputQueueToFiles,spec_section,que_data).wait() 
    else: fileResult = (False,spec_section['sqs_queue_name']) 
    arRes.append(fileResult) 
    SQSpool.waitall() 
    FHpool.waitall() 
    pr.ts_print("Finished Download Pass") 
    return None 

def main(): 
    cfgglob = readConfigs(cfgdir, datdir) 
    sched = BackgroundScheduler() 
    cron_job = setSchedule(sched, cfgglob, 5) 
    sched.start(paused=True) 
    try: 
    change_handle = win32file.FindFirstChangeNotification(cfgdir, 0, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE) 
    processAllQueues(cfgglob) 
    sched.resume() # turn the scheduler back on and monitor both wallclock and config directory. 
    cron_job.resume() 
    while 1: 
     SkipDownload = False 
     result = win32event.WaitForSingleObject(change_handle, 500) 
     if result == win32con.WAIT_OBJECT_0: # If the WaitForSO returned because of a notification rather than error/timing out 
     sched.pause() # make sure we don't run the job as a result of timestamp AND file modification 
     while 1: 
      try: 
      win32file.FindNextChangeNotification(change_handle) # rearm - done at start because of the loop structure here 
      cfgglob = None 
      cfgglob = readConfigs(cfgdir,datdir) 
      cron_job.modify(kwargs={'cfg': cfgglob}) # job_id="RQmain", 
      change_handle = win32file.FindFirstChangeNotification(cfgdir, 0, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE) # refresh handle 
      if not SkipDownload: processAllQueues(cfgglob) 
      sched.resume() 
      cron_job.resume() 
      break 
    except KeyboardInterrupt: 
    if VERBOSE | DEBUG: pr.ts_print("EXITING due to control-C or other exit signal") 
    finally: 
    sched.print_jobs() 
    pr.ts_print(sched.state) 
    sched.shutdown(wait=False) 

を押す前に、それが11分を解雇している必要がありますトップ、それは適切に発火する。私は

from eventlet import patcher, GreenPool ## pip install eventlet 
patcher.monkey_patch(all = True) 

を保つが、最後から2番目の行の印字行までprocessAllQueuesをコメントアウトした場合、それはパッチとGreenPoolをインポートするとかとの問題のいずれかがあることを示し、APSchedulerを発射に失敗しましたmonkey_patch文。 patcher.monkey_patch(all = True)をコメントアウトすると、もう一度「仕事」になります。

私の状況では別のmonkey_patch文がどのように動作するのでしょうか?

+0

https://github.com/eventlet/eventlet/issues/365を参照してください。 – mpag

答えて

1

ファイルの変更を監視する明示的なイベントループがあります。これにより、イベントレットのイベントループが実行されなくなります。次の2つのオプションがあります:前/通話を遮断した後

  • ラップブロッキング呼び出し(などwin32event.WaitForSingleObject())を実行しeventlet.sleep()
  • eventlet.tpool.execute()中に、あなたはあまりにも長い間ブロックしないことを確認してください。

eventlet.monkey_patch(thread=False)は、他のすべてのモジュールを真にリストするより短い代替方法です。一般に、ロックまたはスレッドローカルストレージを使用する場合、またはスレッドAPIを使用して緑のスレッドを生成する場合は、thread=Trueが必要です。面白いGUIフレームワークのように、本当にOSスレッドを使用するのであれば、thread=Falseが必要な場合があります。

重要なプロジェクトを実行するためにWindowsでEventletを使用することは実際には考慮しないでください。パフォーマンスはPOSIXよりずっと劣っています。私は0.17からWindows上でテストを実行しませんでした。それは、普及したデスクトッププラットフォームでの開発の容易さのためです。

関連する問題