2016-04-16 12 views
2

私は、ジョブごとにsubprocess.popenを使用して、5つのバッチで実行したい非同期ジョブのグループ(約100)を持っています。私の計画は次のようになりますので、非同期ジョブのサブバッチを処理する方法は?

  1. ジョブの一覧
  2. 投票から最初の5つのジョブを実行し、アクティブジョブ毎分または
  3. (各ジョブの実行には数分かかります)ジョブが実行された場合、常に我々は全体のジョブリスト

を通じて行ってきたまで続行

  • 時5つのジョブを実行していることはPythonでこれを行うには、既知のパターンがあることを保証し、次のジョブを実行しますか?

  • 答えて

    2

    Python 2では、multiprocessing.Poolsubprocessの組み合わせを使用しました。しかし、これはプールのプロセスの形で余分なオーバーヘッドを持っています。

    したがって、私はmultiprocessing.poolの代わりにconcurrent.futures.ThreadPoolExecutorを使用します。

    以下のコードは、ThreadPoolExecutorの使用方法を示しています。

    import concurrent.futures as cf 
    import logging 
    import os 
    
    errmsg = 'conversion of track {} failed, return code {}' 
    okmsg = 'finished track {}, "{}"' 
    num = len(data['tracks']) 
    with cf.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp: 
        fl = [tp.submit(runflac, t, data) for t in range(num)] 
        for fut in cf.as_completed(fl): 
         idx, rv = fut.result() 
         if rv == 0: 
          logging.info(okmsg.format(idx+1, data['tracks'][idx])) 
         else: 
          logging.error(errmsg.format(idx+1, rv)) 
    

    runflac機能は、音楽ファイルを変換するflac(1)を呼び出すためにsubprocessを使用しています。

    import subprocess 
    
    def runflac(idx, data): 
        """Use the flac(1) program to convert a music file to FLAC format. 
    
        Arguments: 
         idx: track index (starts from 0) 
         data: album data dictionary 
    
        Returns: 
         A tuple containing the track index and return value of flac. 
        """ 
        num = idx + 1 
        ifn = 'track{:02d}.cdda.wav'.format(num) 
        args = ['flac', '--best', '--totally-silent', 
          '-TARTIST=' + data['artist'], '-TALBUM=' + data['title'], 
          '-TTITLE=' + data['tracks'][idx], 
          '-TDATE={}'.format(data['year']), 
          '-TGENRE={}'.format(data['genre']), 
          '-TTRACKNUM={:02d}'.format(num), '-o', 
          'track{:02d}.flac'.format(num), ifn] 
        rv = subprocess.call(args, stdout=subprocess.DEVNULL, 
             stderr=subprocess.DEVNULL) 
        return (idx, rv) 
    

    更新:Pythonの2.7で

    は、もう少し複雑しかしことを回避する別の技術があるマルチプロセッシングプールを使用する際のオーバーヘッド。

    基本的な形式は次のとおりです。

    starter = functools.partial(startencoder, crf=args.crf, preset=args.preset) 
    procs = [] 
    maxprocs = cpu_count() 
    for ifile in args.files: 
        while len(procs) == maxprocs: 
         manageprocs(procs) 
        procs.append(starter(ifile)) 
    while len(procs) > 0: 
        manageprocs(procs) 
    

    (。。functools.partialを使用すると、機能のデフォルトの引数を設定する方法であることが原則に関係ありません)startencoder関数は基本的にsubprocess.Popenのラッパーですが、 Popenインスタンスを除くいくつかの追加情報を返します。

    def startencoder(fname, crf, preset): 
        """ 
        Use ffmpeg to convert a video file to H.264/AAC streams in an MP4 
        container. 
    
        Arguments: 
         fname: Name of the file to convert. 
         crf: Constant rate factor. See ffmpeg docs. 
         preset: Encoding preset. See ffmpeg docs. 
    
        Returns: 
         A 3-tuple of a Process, input path and output path. 
        """ 
        basename, ext = os.path.splitext(fname) 
        known = ['.mp4', '.avi', '.wmv', '.flv', '.mpg', '.mpeg', '.mov', '.ogv', 
          '.mkv', '.webm'] 
        if ext.lower() not in known: 
         ls = "File {} has unknown extension, ignoring it.".format(fname) 
         logging.warning(ls) 
         return (None, fname, None) 
        ofn = basename + '.mp4' 
        args = ['ffmpeg', '-i', fname, '-c:v', 'libx264', '-crf', str(crf), 
          '-preset', preset, '-flags', '+aic+mv4', '-c:a', 'libfaac', 
          '-sn', '-y', ofn] 
        try: 
         p = subprocess.Popen(args, stdout=subprocess.DEVNULL, 
              stderr=subprocess.DEVNULL) 
         logging.info("Conversion of {} to {} started.".format(fname, ofn)) 
        except: 
         logging.error("Starting conversion of {} failed.".format(fname)) 
        return (p, fname, ofn) 
    

    重要なことはmanageprocs機能である:

    def manageprocs(proclist): 
        """ 
        Check a list of subprocesses tuples for processes that have ended and 
        remove them from the list. 
    
        Arguments: 
         proclist: a list of (process, input filename, output filename) 
           tuples. 
        """ 
        nr = '# of conversions running: {}\r'.format(len(proclist)) 
        logging.info(nr) 
        sys.stdout.flush() 
        for p in proclist: 
         pr, ifn, ofn = p 
         if pr is None: 
          proclist.remove(p) 
         elif pr.poll() is not None: 
          logging.info('Conversion of {} to {} finished.'.format(ifn, ofn)) 
          proclist.remove(p) 
        sleep(0.5) 
    
    +0

    私は、Python 2.7で働いているので、私はプールに行きますよ。ありがとう! – John

    +0

    @John私は2.7のコードで掘り下げて、 'Pool'を使用するオーバーヘッドを避けるソリューションを見つけました。私はこれをアップデートとして追加しました。 –

    +0

    素晴らしい!とても有難い。 – John

    関連する問題