2016-10-14 3 views
0

したがって、x個の 'worker'のキューを保持するAutodesk Mayaのための無料のPythonタスクサーバーを作成しています。サーバはいつでも「タスク」を受け入れ、そのタスクをキューに入れて、作業者が解約することができます。タスクキューを持つマルチスレッドpython(mp.Pool)サーバー

キューから、各作業者はサーバーに送信されたMayaファイルの場所を示す「taskDict」と、ヘッドレスMayaアプリケーション(mayapy.exe/standalone)を開くときに実行するコード

私はこれを何度も書き直しました。最初は自分のキューシステムを使っていましたが、私はpythonを使うことにしました。次に、プールを使用し、Que.Queueを使用して、mp.Manager.Queueとプールなどを使用します。情報を受け取り、スレッドをキックする単純なマルチスレッドサーバーの例を見つけるのは難しいですが、あまりにも多くのリクエストを受け取ります。

私は根本的に情報をキューに入れる方法を理解しておらず、キューを介してmp.poolをチャーンさせ、そのデータを使用するapply_asyncプロセスを開始し、完了時にキューに伝えます。だからここ

import tempfile 
import os 
import subprocess 
import threading 
import multiprocessing as mp 
import socket 
import sys 

from PySide import QtGui, QtCore 

import serverUtils 

selfDirectory = os.path.dirname(__file__) 
uiFile = selfDirectory + '/server.ui' 
if os.path.isfile(uiFile): 
    form_class, base_class = serverUtils.loadUiType(uiFile) 
else: 
    print('Cannot find UI file: ' + uiFile) 


def show(): 
    global mayaTaskServerWindow 
    try: 
     mayaTaskServerWindow.close() 
    except: 
     pass 

     mayaTaskServerWindow = mayaTaskServer() 
     mayaTaskServerWindow.show() 
    return mayaTaskServerWindow 

class MayaTaskServer(base_class, form_class): 
    refreshSignal = QtCore.Signal() 

    def __init__(self): 
     super(MayaTaskServer, self).__init__() 

     self.setupUi(self) 

     self.mainJobServer = None 
     self.mpPool = None 
     self.manager = None 
     self.q = None 

     self.workerDict = {} 

     self.refreshSignal.connect(self.refreshTree) 
     self.startLocalCoresBTN.clicked.connect(self.startLocalCoresFn) 
     self.killLocalCoresBTN.clicked.connect(self.killLocalCoresFn) 
     self.jobTree.setContextMenuPolicy(QtCore.Qt.CustomContextMenu) 
     self.jobTree.customContextMenuRequested.connect(self.openMenu) 

     self.startJobServer(6006) 
     self.startQueue() 

     # set the default temp folder 
     filepath = os.path.realpath(__file__) 
     self.localTempFolderEDT.setText(filepath.replace(filepath.split('\\')[-1], '')) 

    ## JOB SYSTEM 
    #################################################################### 

    class MayaWorker(object): 
     def __init__(self, host, port, cpuID): 
      self.host = host 
      self.port = port 
      self.location = None 
      self.cpuID = cpuID 

      self.location = self.host 

      self.busy = False 
      self.task = None 
      self.taskHistory = {} 

     def runTask(self, task): 
      print 'starting task - ', self.task['description'] 
      self.busy = True 
      serverUtils.spawnMaya(task) 
      win.refreshSignal.emit() 

     def taskComplete(self, arg): 
      self.busy = False 
      self.task = None 
      self.mayaFile = None 
      win.refreshSignal.emit() 

    def bootUpLocalWorkers(self, numProcs): 
     self.mpPool = mp.Pool(processes=numProcs) 
     for i in range(0, numProcs): 
      mw = self.MayaWorker('localhost', 6006, i) 
      win.mpPool.apply_async(mw, args=(win.q)) 
      win.workerDict['CPU_' + str(i)] = mw 

    ## USER INTERFACE 
    #################################################################### 

    #UI code here you don't care about 

    ## JOB LISTENER/SERVER/QUEUE 
    #################################################################### 
    class JobServer(threading.Thread): 
     def __init__(self, port): 
      threading.Thread.__init__(self) 
      self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
      self.server_socket.bind(('localhost', port)) 
      self.server_socket.listen(5) 

      self.port = port 
      self.running = True 

      self.mpPool = None 

     def addToQueue(self, task): 
      #add to queue 
      win.q.put(task, timeout=1000) 

      #update UI 
      wid1 = QtGui.QTreeWidgetItem() 
      wid1.setText(0, str(task)) 
      win.queTree.addTopLevelItem(wid1) 

     def run(self, debug=1): 
      print 'Starting Task Server @' + socket.gethostbyname(socket.gethostname()) + ':' + str(self.port) 
      while self.running: 
       client_socket, address = self.server_socket.accept() 
       ip = str(address[0]) 
       data = client_socket.recv(512) 
       if 'runTask' in data: 
        taskDict = eval(data.split(' >> ')[-1]) 
        print 'SERVER>> Received task:', str(taskDict) 
        self.addToQueue(taskDict) 

    class TaskQueueServer(threading.Thread): 
     def __init__(self): 
      q = self.q_in 
      while True: 
       if self.q_in: 
        worker = win.findLocalWorker() 
        if worker: 
         taskDict = self.q_in[0] 
         worker.task = taskDict 
         worker.startTask() 
         self.q_in.pop[0] 


    def startJobServer(self, port): 
     self.mainJobServer = self.JobServer(port) 
     self.mainJobServer.start() 

    def startQueue(self): 
     self.manager = mp.Manager() 
     self.q = self.manager.Queue() 


if __name__ == "__main__": 
    app = QtGui.QApplication(sys.argv) 
    win = MayaTaskServer() 
    win.show() 
    sys.exit(app.exec_()) 

答えて

0

が、私はそれをやった方法は次のとおりです。

は、ここでは、コードの現在の状態です。非常にシンプルで実用的なソリューションです。

私は 'finaLocalWorker'というメソッドを持っています。ワーカークラスが「ビジー」とマークされることがあります。ワーカーがビジーでない場合は、着信タスクがそのタスクに送信されます。

すべての労働者が使用中の場合は、着信タスクは「self.q」と呼ばれる単純なリストに追加されます。

労働者がタスクを終了すると、mpPool.apply_asyncはtaskCompleteメソッドを起動するコールバックを持っています。このメソッドは、 'self.qなら、リストの[0]項目を取り出し、それをポップ(削除)します。それ以外は自分自身が忙しくないとマークする。

これにより、500個のアニメーションのバッチのような着信要求がタスクリストにキューイングされることが可能になりますが、サーバーはいまだにタスクを何も受信せずにすぐに受信タスクを処理できます。

最後のコードをgithubに入れます。

関連する問題