2016-09-17 14 views
-1

私たちはいくつかの物理モデルのパラメータ空間をランダムにスキャンするために大規模なPythonコードを終了しています(したがって、最小限の例を与えるのは非常に困難です)。 1つのパラメータポイントを評価するのに約300msかかりますが、コンピューティングクラスタ上のCPU予算を犠牲にする評価が突然数時間かかることがあります。threading.join()はタイムアウトをどのように検出しますか?

私の考えは、パラメータポイントの各評価に実行の最大時間を与えるためにスレッディングを使用することでした。評価に時間がかかる場合は、この点を非物理的なものとして無視することができます。今、これはうまくいかないようです。私は新しいスレッドで計算を開始し、メインスレッドにタイムアウトを1秒に設定して結合しますが、メインスレッドは計算が終了するのを待っています(これは1秒よりかなり長い時間がかかります)。

これはどのように可能ですか?スレッディングは、新しいスレッドがすでに実行されている時間をどのように測定しますか? 私は、1つのパラメータポイントの評価中に、nlopt、numpy、scipyを大いに使用していると言わなければなりません。その大部分は、私が想定しているように、Pythonには直接記述されていませんが、計算のスピードを上げるためにバイナリが使用されています。これはスレッディングに影響を及ぼしますか(関数はそれに「ブラックボックス」なので)?

ありがとうございます!

+0

[join() '](https://docs.python.org/3.5/library/threading.html#threading.Thread.join)のドキュメントを読んだことがありますか? 'join()'は常に 'None'を返すので、' join() 'の後に' is_alive() 'を呼び出してタイムアウトが起きたかどうかを判断する必要があります** - スレッドがまだ生きている場合、' join () '呼び出しがタイムアウトしました。* – Bakuriu

+0

また、標準インターフェイスはスレッドを殺す方法を提供しません**。あなたは一層簡単に殺すことができるマルチ*処理*を使う方が良いです。 – Bakuriu

答えて

0

短い答え:

私はthreading.joinチェックタイムアウトを考えていません。タイムアウトしたかどうかを確認する必要があります。

  • Timeout function using threading in python does not work
  • 作業溶液を得るためにいずれの場合も
  • Is there any way to kill a Thread in Python?
    • は、最小限のコードスニペットは役立つだろう。これは主に推測ですが、メインプロセスがタイムアウトをチェックしていない場合は、そのまま継続します。

      長い答え:

      timeoutパラメータがどこに行くのを見てみましょう:

      https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1060

      self._wait_for_tstate_lock(timeout=max(timeout, 0)) 
      

      https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1062-L1074

      def _wait_for_tstate_lock(self, block=True, timeout=-1): 
          # Issue #18808: wait for the thread state to be gone. 
          # At the end of the thread's life, after all knowledge of the thread 
          # is removed from C data structures, C code releases our _tstate_lock. 
          # This method passes its arguments to _tstate_lock.acquire(). 
          # If the lock is acquired, the C code is done, and self._stop() is 
          # called. That sets ._is_stopped to True, and ._tstate_lock to None. 
          lock = self._tstate_lock 
          if lock is None: # already determined that the C code is done 
           assert self._is_stopped 
          elif lock.acquire(block, timeout): 
           lock.release() 
           self._stop() 
      

      何のロックは、スレッドが停止していることを確認していない場合。 それ以外の場合は、パラメータblocktimeoutを指定してロックを取得します。

      https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L117

      def acquire(self, blocking=True, timeout=-1): 
          """Acquire a lock, blocking or non-blocking. 
          When invoked without arguments: if this thread already owns the lock, 
          increment the recursion level by one, and return immediately. Otherwise, 
          if another thread owns the lock, block until the lock is unlocked. Once 
          the lock is unlocked (not owned by any thread), then grab ownership, set 
          the recursion level to one, and return. If more than one thread is 
          blocked waiting until the lock is unlocked, only one at a time will be 
          able to grab ownership of the lock. There is no return value in this 
          case. 
          When invoked with the blocking argument set to true, do the same thing 
          as when called without arguments, and return true. 
          When invoked with the blocking argument set to false, do not block. If a 
          call without an argument would block, return false immediately; 
          otherwise, do the same thing as when called without arguments, and 
          return true. 
          When invoked with the floating-point timeout argument set to a positive 
          value, block for at most the number of seconds specified by timeout 
          and as long as the lock cannot be acquired. Return true if the lock has 
          been acquired, false if the timeout has elapsed. 
          """ 
          me = get_ident() 
          if self._owner == me: 
           self._count += 1 
           return 1 
          rc = self._block.acquire(blocking, timeout) 
          if rc: 
           self._owner = me 
           self._count = 1 
          return rc 
      

      スレッドIDを取得ロックを取得します。 カウントをインクリメントします。

      本当にロックを取得します。

      https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L98

      self._block = _allocate_lock() 
      

      https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L33

      _allocate_lock = _thread.allocate_lock 
      

      https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L4

      import _thread 
      

      https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1300-L1301

      static PyMethodDef thread_methods[] = { 
          {"start_new_thread",  (PyCFunction)thread_PyThread_start_new_thread, 
          METH_VARARGS, start_new_doc}, 
          {"start_new",    (PyCFunction)thread_PyThread_start_new_thread, 
          METH_VARARGS, start_new_doc}, 
          {"allocate_lock",   (PyCFunction)thread_PyThread_allocate_lock, 
          METH_NOARGS, allocate_doc}, 
          {"allocate",    (PyCFunction)thread_PyThread_allocate_lock, 
          METH_NOARGS, allocate_doc}, 
          {"exit_thread",    (PyCFunction)thread_PyThread_exit_thread, 
          METH_NOARGS, exit_doc}, 
          {"exit",     (PyCFunction)thread_PyThread_exit_thread, 
          METH_NOARGS, exit_doc}, 
          {"interrupt_main",   (PyCFunction)thread_PyThread_interrupt_main, 
          METH_NOARGS, interrupt_doc}, 
          {"get_ident",    (PyCFunction)thread_get_ident, 
          METH_NOARGS, get_ident_doc}, 
          {"_count",     (PyCFunction)thread__count, 
          METH_NOARGS, _count_doc}, 
          {"stack_size",    (PyCFunction)thread_stack_size, 
          METH_VARARGS, stack_size_doc}, 
          {"_set_sentinel",   (PyCFunction)thread__set_sentinel, 
          METH_NOARGS, _set_sentinel_doc}, 
          {NULL,      NULL}   /* sentinel */ 
      }; 
      

      タイプPyCFunctionと名前thread_PyThread_allocate_lock

      https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1128-L1131

      static PyObject * 
      thread_PyThread_allocate_lock(PyObject *self) 
      { 
          return (PyObject *) newlockobject(); 
      } 
      

      https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L538-L553

      static lockobject * 
      newlockobject(void) 
      { 
          lockobject *self; 
          self = PyObject_New(lockobject, &Locktype); 
          if (self == NULL) 
           return NULL; 
          self->lock_lock = PyThread_allocate_lock(); 
          self->locked = 0; 
          self->in_weakreflist = NULL; 
          if (self->lock_lock == NULL) { 
           Py_DECREF(self); 
           PyErr_SetString(ThreadError, "can't allocate lock"); 
           return NULL; 
          } 
          return self; 
      } 
      

      新しいコンテキストを割り当て、

      012をロックと同じようにallocated_lock方法を定義します

      PyThread_type_lock 
      PyThread_allocate_lock(void) 
      { 
          sem_t *lock; 
          int status, error = 0; 
      
          dprintf(("PyThread_allocate_lock called\n")); 
          if (!initialized) 
           PyThread_init_thread(); 
      
          lock = (sem_t *)PyMem_RawMalloc(sizeof(sem_t)); 
      
          if (lock) { 
           status = sem_init(lock,0,1); 
           CHECK_STATUS("sem_init"); 
      
           if (error) { 
            PyMem_RawFree((void *)lock); 
            lock = NULL; 
           } 
          } 
      
          dprintf(("PyThread_allocate_lock() -> %p\n", lock)); 
          return (PyThread_type_lock)lock; 
      } 
      

      https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread.c#L60-L77

      void 
      PyThread_init_thread(void) 
      { 
      #ifdef Py_DEBUG 
          char *p = Py_GETENV("PYTHONTHREADDEBUG"); 
      
          if (p) { 
           if (*p) 
            thread_debug = atoi(p); 
           else 
            thread_debug = 1; 
          } 
      #endif /* Py_DEBUG */ 
          if (initialized) 
           return; 
          initialized = 1; 
          dprintf(("PyThread_init_thread called\n")); 
          PyThread__init_thread(); 
      } 
      

      https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread_pthread.h#L170-L176

      static void 
      PyThread__init_thread(void) 
      { 
      #if defined(_AIX) && defined(__GNUC__) 
          extern void pthread_init(void); 
          pthread_init(); 
      #endif 
      } 
      

      https://github.com/python/cpython/blob/f243de2bc8d940316ce8da778ec02a7bbe594de1/configure.ac#L3416

      AC_CHECK_FUNCS(alarm accept4 setitimer getitimer bind_textdomain_codeset chown \ 
      clock confstr ctermid dup3 execv faccessat fchmod fchmodat fchown fchownat \ 
      fexecve fdopendir fork fpathconf fstatat ftime ftruncate futimesat \ 
      futimens futimes gai_strerror getentropy \ 
      getgrouplist getgroups getlogin getloadavg getpeername getpgid getpid \ 
      getpriority getresuid getresgid getpwent getspnam getspent getsid getwd \ 
      if_nameindex \ 
      initgroups kill killpg lchmod lchown lockf linkat lstat lutimes mmap \ 
      memrchr mbrtowc mkdirat mkfifo \ 
      mkfifoat mknod mknodat mktime mremap nice openat pathconf pause pipe2 plock poll \ 
      posix_fallocate posix_fadvise pread \ 
      pthread_init pthread_kill putenv pwrite readlink readlinkat readv realpath renameat \ 
      select sem_open sem_timedwait sem_getvalue sem_unlink sendfile setegid seteuid \ 
      setgid sethostname \ 
      setlocale setregid setreuid setresuid setresgid setsid setpgid setpgrp setpriority setuid setvbuf \ 
      sched_get_priority_max sched_setaffinity sched_setscheduler sched_setparam \ 
      sched_rr_get_interval \ 
      sigaction sigaltstack siginterrupt sigpending sigrelse \ 
      sigtimedwait sigwait sigwaitinfo snprintf strftime strlcpy symlinkat sync \ 
      sysconf tcgetpgrp tcsetpgrp tempnam timegm times tmpfile tmpnam tmpnam_r \ 
      truncate uname unlinkat unsetenv utimensat utimes waitid waitpid wait3 wait4 \ 
      wcscoll wcsftime wcsxfrm wmemcmp writev _getpty) 
      

      http://man7.org/linux/man-pages/man7/pthreads.7.html

      これは2つのことを尋ねます:タイムアウトはfloatですか? ?isAlive場合は、あなたがチェックしている:timeout引数なし存在しない場合、それは数秒で操作のタイムアウトを指定する浮動小数点数でなければなりません

      (またはその画分)。 join()は常にNoneを返すので、join()の後にis_alive()を呼び出してタイムアウトが起きたかどうかを判断する必要があります。スレッドがまだ生きていれば、join()呼び出しはタイムアウトします。

    関連する問題