2017-05-10 19 views
0

私はTwistedでFTPテストサーバーとクライアントを構築しています。サーバーはうまく動作します。 Twisted ftpserver.pyの例と基本的に同じです。クライアントは、ファイルの取得と書き込み中にいくつかのブロック問題が発生しています。私はいくつかの簡単なツイストのスレッドユーティリティを使って解決しようとしましたが、役に立たないものです。ここで ノンブロッキングクライアントFTPの取得と書き込み

が私のサーバーです:

#!/usr/bin/env python2 
from __future__ import print_function, division, absolute_import 

# Import twisted things 

from twisted.protocols.ftp import FTPFactory 
from twisted.protocols.ftp import FTPRealm 
from twisted.internet import reactor 
from twisted.cred.portal import Portal 
from twisted.cred.checkers import AllowAnonymousAccess 

p = Portal(FTPRealm("test/"), [AllowAnonymousAccess()]) 

f = FTPFactory(p) 
f.timeOut = None 

reactor.listenTCP(5504, f) 
reactor.run() 

クライアント側、これと対になって、あなたが取得したいファイルの名前を書くために、テキストボックスを提示し、簡単なwxPythonのGUIのである。このGUI内。方法は50ミリ秒ごとに実行されるwx.Timerがあります。これが私のFTPファイルの取得を妨げているのです。私は、主なスレッドが使い果たされているため、データを受け取るプロトコルにはしゃっくりがあることがわかります。なぜ私がこの設定を持っているのか不思議であれば、はるかに大きなプロジェクトのユースケースをシミュレートしています。

ファイルを取得する必要があるときに、特定の時点でdeferToThreadを使用することを試みました。しかし、現在のスレッドを印刷することによって、データを受信して​​いるプロトコルがメインスレッドで実行されていることがわかります。これは私が解決しようとしている問題です。どんな助けも大歓迎です。

私のクライアントコード:

#!/usr/bin/env python2 
from __future__ import print_function, division, absolute_import 

import wx 
import sys 
import threading 

from twisted.internet import wxreactor 
wxreactor.install() 

from twisted.internet import reactor 

from twisted.protocols.ftp import FTPClient 

from twisted.internet import protocol 
from twisted.internet import threads 
from twisted.python import log 

# This is the GUI 
class TextSend(wx.Frame): 

    def __init__(self): 
     wx.Frame.__init__(self, None, -1, "Request Files", size=(200, 75)) 

     self.protocol = None # ftp client protocol 
     self.factory = None 

     panel = wx.Panel(self) 

     vertSizer = wx.BoxSizer(wx.VERTICAL) 
     horzSizer = wx.BoxSizer(wx.HORIZONTAL) 

     self.fileName = None 
     self.textbox = wx.TextCtrl(parent=panel, id=100, size=(100,-1)) 
     self.btn = wx.Button(panel, label="Retr.") 

     # timer and checkbox for timer 
     self.timer = wx.Timer(self, id=wx.ID_ANY) 
     self.check = wx.CheckBox(parent=panel, label="Start blocking") 

     #Bind 
     self.textbox.Bind(wx.EVT_TEXT, self.getText) 
     self.btn.Bind(wx.EVT_BUTTON, self.press) 
     self.check.Bind(wx.EVT_CHECKBOX, self.onCheck) 
     self.Bind(wx.EVT_TIMER, self.onTimer, self.timer) 

     horzSizer.Add(self.textbox, flag=wx.ALIGN_CENTER) 
     horzSizer.Add(self.btn, flag=wx.ALIGN_CENTER) 

     vertSizer.Add(horzSizer, flag=wx.ALIGN_CENTER) 
     vertSizer.Add(self.check, flag=wx.ALIGN_CENTER) 

     panel.SetSizer(vertSizer) 
     panel.Layout() 

    def getText(self, evt): 
     self.fileName = str(self.textbox.GetValue()) 

    def onCheck(self, evt): 
     yes = self.check.GetValue() 
     if yes: 
      print("Starting timer") 
      self.timer.Start(50) 
     else: # no 
      self.timer.Stop() 

    def onTimer(self, evt): 
     #print("Triggered timer") 
     pass 

    def press(self, evt): 
     print("Send:", self.fileName) 

     d = threads.deferToThread(self.retrieve) 
     d.addCallback(self.done) 

    def retrieve(self): 
     print(threading.current_thread()) 
     # This is what does the retrieving. Pass in FileWriter and 
     # FileWriter's dataReceived method is called by main thread 
     self.protocol.retrieveFile(self.fileName, FileWriter(self.fileName), offset=0).addCallbacks(self.done, self.fail) 
     return "Done with deferToThread" 

    def done(self, msg): 
     print(threading.current_thread()) 
     print("DONE Retrieving:", msg) 

    def fail(self, error): 
     print('Failed. Error was:') 
     print(error) 

# This writes to the file of a same name as the one retrieved. 
class FileWriter(protocol.Protocol): 

    def __init__(self, fileName): 
     self.f = open(fileName, 'wb') 
     print("FROM FileWriter __init__:", threading.current_thread()) 

    def dataReceived(self, data): 
     print("Byte size", len(data)) 
     print("FROM FileWriter dataReceived:", threading.current_thread()) 
     self.f.write(data) 

    def connectionLost(self, reason): 
     print("Writing closed and done") 
     print("FROM FileWriter connectionLost:", threading.current_thread()) 
     self.f.close() 

# Client FTP Protocol 
class TestClient(FTPClient, object): 

    def __init__(self, factory, username, password, passive): 
     super(TestClient, self).__init__(username=username, password=password, passive=passive) 
     self.factory = factory 

    def connectionMade(self): 
     print("hello") 
     gui = self.factory.gui 
     gui.protocol = self 

# Twisted Client Factory 
class FileClientFactory(protocol.ClientFactory): 

    def __init__(self, gui): 
     self.gui = gui 
     self.protocol = None 

    def buildProtocol(self, addr): 
     user = 'anonymous' 
     passwd = '[email protected]' 
     self.protocol = TestClient(self, username=user, password=passwd, passive=1) 
     return self.protocol 

    def clientConnectionLost(self, transport, reason): 
     print("Connectiong lost normally:", reason) 

    def clientConnectionFailed(self, transport, reason): 
     print("Connection failed:", reason) 


if __name__ == "__main__": 
    # Initialize and show GUI 
    logger = log.startLogging(sys.stdout) 
    app = wx.App(False) 
    app.frame = TextSend() 
    app.frame.Show() 
    reactor.registerWxApp(app) 

    # Build Factory 
    f = FileClientFactory(app.frame) 

    # Connect to FTP server 
    reactor.connectTCP("localhost", 5504, f) 
    reactor.run() 

    wxPython main loop. 
    app.MainLoop() 

答えて

1

あなたはdeferToThread(function_that_uses_twisted_apis)することはできません。ツイストされたAPIはほとんどすべてのスレッドセーフではありません。リアクタースレッドでのみ使用する必要があります(例外は、スレッドスケジューリング関連のAPIの2つです)。

代わりに、ブロックコードを削除してください。 を別のスレッド、別のプロセスに入れたり、ノンブロッキングに書き換えてください。

+0

このアプローチを試してみましょう。 – Tristan

関連する問題