2016-03-23 9 views
1

私は大量のutils.getProcessOutputAndValue('cmd', [args])コマンドを実行しています。結果はtask.react()またはreactor.run()OSError:[Errno 24] Twistedでreactor.run()を使用したときに開いているファイルが多すぎる

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

from progress.bar import IncrementalBar 
from twisted.internet import defer 
from twisted.internet import task 
from twisted.internet import utils 
from twisted.python import usage 


class Options(usage.Options): 
    optFlags = [['reactor', 'r', 'Use reactor.run().'], 
       ['task', 't', 'Use task.react().'], 
       ['cwr', 'w', 'Use callWhenRunning().']] 
    optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'], 
        ['cmd', 'c', 'echo Testing {i}...', 'Command to run.']] 


def run(opt): 
    limit = int(opt['limit']) 
    cmd, args = opt['cmd'].split(' ', 1) 
    bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit) 
    requests = [] 
    for i in range(0, limit): 
     try: 
      _args = args.format(i=i) 
      args = _args 
     except KeyError: 
      pass 
     requests.append(utils.getProcessOutputAndValue('echo', [args])) 
     bar.next() 
    bar.finish() 
    return defer.gatherResults(requests) 


@defer.inlineCallbacks 
def main(reactor, opt): 
    d = defer.Deferred() 
    limit = int(opt['limit']) 
    cmd, args = opt['cmd'].split(' ', 1) 
    bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit) 
    for i in range(0, limit): 
     try: 
      _args = args.format(i=i) 
      args = _args 
     except KeyError: 
      pass 
     yield utils.getProcessOutputAndValue('echo', [args]) 
     bar.next() 
    bar.finish() 
    defer.returnValue(d.callback(True)) 


if __name__ == '__main__': 
    opt = Options() 
    opt.parseOptions() 

    if opt['reactor']: 
     from twisted.internet import reactor 
     task.deferLater(reactor, 0, run, opt) 
     reactor.run() 

    elif opt['task']: 
     from twisted.internet.task import react 
     react(main, [opt]) 

    elif opt['cwr']: 
     from twisted.internet import reactor 
     reactor.callWhenRunning(run, opt) 
     reactor.run() 

私は次のエラーを取得(私の場合)400以上のlimitを使用して:私はuの午前場合

Upon execvpe echo ['echo', 'Testing 0...'] in environment id 42131264 
:Traceback (most recent call last): 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 428, in _fork 
    self._setupChild(**kwargs) 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 803, in _setupChild 
    for fd in _listOpenFDs(): 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 638, in _listOpenFDs 
    return detector._listOpenFDs() 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 553, in _listOpenFDs 
    self._listOpenFDs = self._getImplementation() 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 576, in _getImplementation 
    after = impl() 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 606, in _procFDImplementation 
    return [int(fd) for fd in self.listdir(dname)] 
OSError: [Errno 24] Too many open files: '/proc/23421/fd' 
Unhandled error in Deferred: 

を発生しませんが履歴書にはtask.react()

を歌う:

  • python pyerr.py -l100 -r
  • python pyerr.py -l100 -t OK
  • python pyerr.py -l100 -w OK:OK
  • python pyerr.py -l400 -rOSERR
  • python pyerr.py -l400 -tOK
  • python pyerr.py -l400 -wOSERR

問題は、そのアプリケーションがSMTP接続に対応しているため、私は、原子炉を使用し、大きなアプリケーションを持っているということである(そうtask.reactを使用することはできません、私は原子炉を止めたくない)。

私はいつも延期行って、私はこれ以上のものをやっていると思います...


編集されるとtask.reactのみ原子炉を停止したことを考えた:task.reactreactor.runためここでpstree comparaison

reactor.run(パイソンpyerr.py -l400 -r)

init-+-VBoxService---7*[{VBoxService}] 
    |-acpid 
    |-atd 
    |-cron 
    |-dbus-daemon 
    |-dhclient 
    |-6*[getty] 
    |-master-+-pickup 
    |  `-qmgr 
    |-mysqld---18*[{mysqld}] 
    |-nginx---4*[nginx] 
    |-php5-fpm---2*[php5-fpm] 
    |-puppet---{puppet} 
    |-rpc.idmapd 
    |-rpc.statd 
    |-rpcbind 
    |-rsyslogd---3*[{rsyslogd}] 
    |-ruby---{ruby} 
    |-sshd-+-3*[sshd---sshd---sftp-server] 
    |  |-sshd---sshd---2*[sftp-server] 
    |  |-sshd---sshd---bash---pstree 
    |  `-sshd---sshd---bash---python-+-323*[echo] 
    |         `-5*[python] 
    |-systemd-logind 
    |-systemd-udevd 
    |-upstart-file-br 
    |-upstart-socket- 
    `-upstart-udev-br 

task.react(パイソンpyerr.py -l400 -t)

init-+-VBoxService---7*[{VBoxService}] 
    |-acpid 
    |-atd 
    |-cron 
    |-dbus-daemon 
    |-dhclient 
    |-6*[getty] 
    |-master-+-pickup 
    |  `-qmgr 
    |-mysqld---18*[{mysqld}] 
    |-nginx---4*[nginx] 
    |-php5-fpm---2*[php5-fpm] 
    |-puppet---{puppet} 
    |-rpc.idmapd 
    |-rpc.statd 
    |-rpcbind 
    |-rsyslogd---3*[{rsyslogd}] 
    |-ruby---{ruby} 
    |-sshd-+-3*[sshd---sshd---sftp-server] 
    |  |-sshd---sshd---2*[sftp-server] 
    |  |-sshd---sshd---bash---pstree 
    |  `-sshd---sshd---bash---python---echo 
    |-systemd-logind 
    |-systemd-udevd 
    |-upstart-file-br 
    |-upstart-socket- 
    `-upstart-udev-br 

注意この

|  `-sshd---sshd---bash---python-+-323*[echo] 
|         `-5*[python] 

一のCASにおけるこの

|  `-sshd---sshd---bash---python---echo 

差プロセスが完了するとすぐに閉じられないようです。

  • のUbuntu 14.04
  • CentOSに6
  • CentOSに7

問題はまったく同じです:

私は4台の異なるマシン上でこの問題をテストしています。

ショットを出すには、watch -n 0.1 "pstree"を実行して、プロセスが進化している様子を見てください。


編集:これは答えをグリフに感謝を起こったが、どのように私の本当の人生の場合にこれを適応させることである、なぜ私はそれを得ますか?私はツイストして開発しています

アプリケーションが(私たちは、電子メールの署名を確認したいと仮定し)、それは働くここにどのように、のMilterに基づいてSMTPフィルタである:

  • 接続は、ポート25
  • のmilterで開きますプロトコル
  • milterは、/usr/bin/openssl mimeで署名チェックを処理するリモート "モジュール"サーバを呼び出します。
  • このモジュールは、署名が有効かどうかを示す応答を返します

この場合、150個の同時接続があり、モジュール(TCPプロトコル)への呼び出しが150回あり、このモジュールは接続ごとに1回opensslコマンドを呼び出します。

モジュールは完全に独立しているため、他のコールが実行されているかどうかはわかりません。あなたの意見にはどこにDeferredSemaphoreを入れますか?

ここで私の問題は、smtp接続もagnosticsであり、他の可能なオープン接続について知らないということです。

あなたの意見にこの平行四辺形を正しく処理する方法はありますか?

+0

それは*すべて* 'task.react'です。あなたのサンプルをLinuxサーバーに貼り付けると、あなたが提供したすべての値に対してサンプルを正常に実行することができたので、何がうまくいかないのか分かりません。おそらく、いくつかのローカル設定に問題がありますか? – Glyph

+0

私は2つの例を 'pstree'で実行しました。' -r'スイッチを使うと、私は300のpythonサブプロセスを見たので、朝に詳細なサンプルを貼り付けます。私はプロセスが一旦終了するとハングアップし、/ procの制限エラーを引き起こす印象を受けました。 – kitensei

+0

私は 'pstree'グラフを追加しました。私のテストを少し詳しく説明しました。この問題は3種類のOSで4種類のマシンでテストされています。私は結果を修正したことに注意してください:** - r **スイッチ 'python pyerr.py -l400 -r'は失敗します。** - t ** – kitensei

答えて

2

ここで問題task.reactreactor.runの区別とは何の関係もありませんが、むしろ、あなたのrunmain関数の実装の間に微妙だが、有意差。

違いはrunが簡単にシステムの限界を吹いて、同時オープンファイル記述子の数千人をラッキング、並列にlimitプロセスを産卵していることです。しかし、mainは、すべてのプロセスが完全に終了してから次のプロセスを起動するまで待っています。つまり、一度に4つ以上のプロセスを使用することはありません。

理由はmaininlineCallbacksで装飾され、そのDeferredが完了するまでmainの実行を中断し、すべてのgetProcessOutputAndValueDeferredを得ていることです。

実際のアプリケーションでは、どちらの方法も理想的ではありません。いくつかの並列性が必要ですが、無制限ではありません。 Twistedには、一度に1つのタスクのみを実行するようにすべてを制限することなく、限られた並列処理を容易にするために、DeferredSemaphoreなどのユーティリティが付属しています。 Jean-Paul Calderoneは10年前に記事を書いた! - これを使用する方法については、hereを参照してください。

しかし、ただ問題はtask.reactとは何の関係もないことを実証するために、ここにrun機能を排除し、mainを使用してリンゴ・ツー・リンゴを比較して、あなたの例の修正版です:

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

from progress.bar import IncrementalBar 
from twisted.internet import defer 
from twisted.internet import task 
from twisted.internet import utils 
from twisted.python import usage 


class Options(usage.Options): 
    optFlags = [['reactor', 'r', 'Use reactor.run().'], 
       ['task', 't', 'Use task.react().'], 
       ['cwr', 'w', 'Use callWhenRunning().']] 
    optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'], 
        ['cmd', 'c', 'echo Testing {i}...', 'Command to run.']] 


@defer.inlineCallbacks 
def main(reactor, opt): 
    d = defer.Deferred() 
    limit = int(opt['limit']) 
    cmd, args = opt['cmd'].split(' ', 1) 
    bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit) 
    for i in range(0, limit): 
     try: 
      _args = args.format(i=i) 
      args = _args 
     except KeyError: 
      pass 
     yield utils.getProcessOutputAndValue('echo', [args]) 
     bar.next() 
    bar.finish() 
    defer.returnValue(d.callback(True)) 


if __name__ == '__main__': 
    opt = Options() 
    opt.parseOptions() 

    if opt['reactor']: 
     from twisted.internet import reactor 
     task.deferLater(reactor, 0, main, reactor, opt) 
     reactor.run() 

    elif opt['task']: 
     from twisted.internet.task import react 
     react(main, [opt]) 

    elif opt['cwr']: 
     from twisted.internet import reactor 
     reactor.callWhenRunning(main, reactor, opt) 
     reactor.run() 

は、 編集、質問に編集する応答:あなたの本当の問題がちょうどforループの着信接続であり、そしてないので

、むしろDeferredSemaphoreを使用するよりも、あなたの代わりにカウンターを維持し、オブジェクトがlistenTCP、またはTCP4ServerEndpointから戻ってくるDeferredの結果から返されたという事実を利用する、IPushProducerを実装し、そして時にあまりにも多く、それにpauseProducing()を呼び出す必要があります同時接続は作業を行い、その作業が完了するとresumeProducing()となります。

+0

ありがとう、私は完全にそれを得る、私が掘るほど、私はツイストを愛するほど;)。あなたは私の最後の編集をチェックして、私が実際のケースでどのように進むべきか見てみてください。 – kitensei

+0

あなたのために編集が追加されました。 – Glyph

+0

お返事ありがとうございます;) – kitensei

関連する問題