したがって、x個の 'worker'のキューを保持するAutodesk Mayaのための無料のPythonタスクサーバーを作成しています。サーバはいつでも「タスク」を受け入れ、そのタスクをキューに入れて、作業者が解約することができます。タスクキューを持つマルチスレッドpython(mp.Pool)サーバー
キューから、各作業者はサーバーに送信されたMayaファイルの場所を示す「taskDict」と、ヘッドレスMayaアプリケーション(mayapy.exe/standalone)を開くときに実行するコード
私はこれを何度も書き直しました。最初は自分のキューシステムを使っていましたが、私はpythonを使うことにしました。次に、プールを使用し、Que.Queueを使用して、mp.Manager.Queueとプールなどを使用します。情報を受け取り、スレッドをキックする単純なマルチスレッドサーバーの例を見つけるのは難しいですが、あまりにも多くのリクエストを受け取ります。
私は根本的に情報をキューに入れる方法を理解しておらず、キューを介してmp.poolをチャーンさせ、そのデータを使用するapply_asyncプロセスを開始し、完了時にキューに伝えます。だからここ
import tempfile
import os
import subprocess
import threading
import multiprocessing as mp
import socket
import sys
from PySide import QtGui, QtCore
import serverUtils
selfDirectory = os.path.dirname(__file__)
uiFile = selfDirectory + '/server.ui'
if os.path.isfile(uiFile):
form_class, base_class = serverUtils.loadUiType(uiFile)
else:
print('Cannot find UI file: ' + uiFile)
def show():
global mayaTaskServerWindow
try:
mayaTaskServerWindow.close()
except:
pass
mayaTaskServerWindow = mayaTaskServer()
mayaTaskServerWindow.show()
return mayaTaskServerWindow
class MayaTaskServer(base_class, form_class):
refreshSignal = QtCore.Signal()
def __init__(self):
super(MayaTaskServer, self).__init__()
self.setupUi(self)
self.mainJobServer = None
self.mpPool = None
self.manager = None
self.q = None
self.workerDict = {}
self.refreshSignal.connect(self.refreshTree)
self.startLocalCoresBTN.clicked.connect(self.startLocalCoresFn)
self.killLocalCoresBTN.clicked.connect(self.killLocalCoresFn)
self.jobTree.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
self.jobTree.customContextMenuRequested.connect(self.openMenu)
self.startJobServer(6006)
self.startQueue()
# set the default temp folder
filepath = os.path.realpath(__file__)
self.localTempFolderEDT.setText(filepath.replace(filepath.split('\\')[-1], ''))
## JOB SYSTEM
####################################################################
class MayaWorker(object):
def __init__(self, host, port, cpuID):
self.host = host
self.port = port
self.location = None
self.cpuID = cpuID
self.location = self.host
self.busy = False
self.task = None
self.taskHistory = {}
def runTask(self, task):
print 'starting task - ', self.task['description']
self.busy = True
serverUtils.spawnMaya(task)
win.refreshSignal.emit()
def taskComplete(self, arg):
self.busy = False
self.task = None
self.mayaFile = None
win.refreshSignal.emit()
def bootUpLocalWorkers(self, numProcs):
self.mpPool = mp.Pool(processes=numProcs)
for i in range(0, numProcs):
mw = self.MayaWorker('localhost', 6006, i)
win.mpPool.apply_async(mw, args=(win.q))
win.workerDict['CPU_' + str(i)] = mw
## USER INTERFACE
####################################################################
#UI code here you don't care about
## JOB LISTENER/SERVER/QUEUE
####################################################################
class JobServer(threading.Thread):
def __init__(self, port):
threading.Thread.__init__(self)
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind(('localhost', port))
self.server_socket.listen(5)
self.port = port
self.running = True
self.mpPool = None
def addToQueue(self, task):
#add to queue
win.q.put(task, timeout=1000)
#update UI
wid1 = QtGui.QTreeWidgetItem()
wid1.setText(0, str(task))
win.queTree.addTopLevelItem(wid1)
def run(self, debug=1):
print 'Starting Task Server @' + socket.gethostbyname(socket.gethostname()) + ':' + str(self.port)
while self.running:
client_socket, address = self.server_socket.accept()
ip = str(address[0])
data = client_socket.recv(512)
if 'runTask' in data:
taskDict = eval(data.split(' >> ')[-1])
print 'SERVER>> Received task:', str(taskDict)
self.addToQueue(taskDict)
class TaskQueueServer(threading.Thread):
def __init__(self):
q = self.q_in
while True:
if self.q_in:
worker = win.findLocalWorker()
if worker:
taskDict = self.q_in[0]
worker.task = taskDict
worker.startTask()
self.q_in.pop[0]
def startJobServer(self, port):
self.mainJobServer = self.JobServer(port)
self.mainJobServer.start()
def startQueue(self):
self.manager = mp.Manager()
self.q = self.manager.Queue()
if __name__ == "__main__":
app = QtGui.QApplication(sys.argv)
win = MayaTaskServer()
win.show()
sys.exit(app.exec_())