0

私はprogressbarモジュールに非常に満足しています。私はStdOutリダイレクト機能で多くのことを使用しています。最近、私は(pathos)マルチプロセッシングを使い始めましたが、2つを組み合わせて動作させることはできません。Pythonはマルチプロセスを使ってstdout(progressbar)をリダイレクトします

また、キーボードの割り込みにいくつか問題がありましたが、これはa bug in Python2が原因であることが判明しました。私はこの問題に関連する場合に対処するために使用するコードを追加しました。

さらに、さまざまな地図機能を使って遊ぶことが多くの問題を解決できることに気付きました。私はimapを使用しています。中間結果をcsvファイルに書き出し、もちろんプログレスバーを表示したいからです。

私はStdOutで自分自身を演奏し、インターネット上でいくつかの提案を試みました。しかし、私はいつも2つの望ましくない状況に終わる。

次のいずれかの

  1. STDOUTがリダイレクトされません取得し、プログレスバーは、各print文の後に繰り返されます。
  2. StdOutはリダイレクトされますが、ワーカーの出力は表示されません。

ここでは私の問題を実証するいくつかのおもちゃのコードです:

import time, signal, multiprocessing 
import progressbar 


def do_work(number): 
    if not number % 500: 
     print 'Special log occasion ...' 
    time.sleep(0.1) 

def example(redirect_stdout): 
    workers = multiprocessing.cpu_count() 
    num_tasks = 1000 
    pbar = progressbar.ProgressBar(widgets=[progressbar.Bar()], max_value=num_tasks, redirect_stdout=redirect_stdout) 
    pbar.start() 

    # Start a with SIGINT turned of, so that workers can be interrupted 
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) 
    pool = multiprocessing.Pool(processes=workers) 
    signal.signal(signal.SIGINT, original_sigint_handler) 


    for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1): 
     pbar.update(i) 

    pool.close() 
    pool.join() 
    pbar.finish() 

print "Case1: Progressbar without redirecting output:" 
example(False) 
print "\nCase1: Progressbar without redirecting output:" 
example(True) 

出力:

Case1: Progresspar without redirecing output: 
Special log occasion ... 
|######################      | 
Special log occasion ... 
|#############################################| 


Case2: Progresspar with redirecing output: 
|#############################################| 

答えて

1

複数のプロセスを使用して同じ出力ストリームへの書き込みは、常に同期の問題を起こしやすい、または悪化し、データの上書き/欠落。幸いにもそれがこの問題を回避することは難しいことではありません:)

# vim: set fileencoding=utf-8 
import six 
import sys 
import time 
import signal 
import multiprocessing 
import progressbar 


def do_work(number): 
    if not number % 50: 
     print 'Special log occasion ...' 
     sys.stdout.flush() 
    time.sleep(0.1) 


class IOQueue(six.StringIO): 

    ''' 
    Very poor and simple IO wrapper which only functions for simple print 
    statements 
    ''' 

    def __init__(self, queue, *args, **kwargs): 
     six.StringIO.__init__(self, *args, **kwargs) 
     self.queue = queue 

    def write(self, value): 
     self.queue.put(value) 


def example(redirect_stdout): 
    workers = multiprocessing.cpu_count() 
    num_tasks = 1000 
    pbar = progressbar.ProgressBar(
     widgets=[progressbar.Bar()], 
     max_value=num_tasks, 
     redirect_stdout=redirect_stdout, 
    ) 
    # Start a with SIGINT turned of, so that workers can be interrupted 
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) 

    stdout_queue = multiprocessing.Queue() 

    def initializer(queue): 
     sys.stdout = IOQueue(queue) 

    pool = multiprocessing.Pool(
     processes=workers, initializer=initializer, initargs=[stdout_queue]) 
    signal.signal(signal.SIGINT, original_sigint_handler) 

    for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1): 
     while not stdout_queue.empty(): 
      sys.stdout.write(stdout_queue.get()) 

     pbar.update(i) 

    pool.close() 
    pool.join() 
    pbar.finish() 

example(True) 

上記のコードは、すべての労働者がプログレスバーを更新する前に、通常の標準出力に書き込まれマルチプロセッシング・キューに標準出力データを書き込むことができます。

関連する問題