2016-07-06 7 views
0

現在、私は、TCP/UDPソケットからデータを受信し、そのデータをファイルに書き込むPythonプログラムを作成しています。現在、私のプログラムは、各データグラムをファイルに書き込むことによってI/Oバウンドになります(非常に大きなファイルの場合はこれを行いますので、減速は相当です)。そのことを念頭に置いて、私はソケットから1つのスレッドでデータを受信し、そのデータを別のスレッドに書き込もうと決めました。これまでのところ、私は次の大まかな草案を思いついた。現時点では、ファイルには単一のデータチャンク(512バイト)しか書き込まれません。ソケットデータを1つのスレッドで受信し、別のスレッドにデータを書き込む - Python

f = open("t1.txt","wb") 
def write_to_file(data): 
    f.write(data) 

def recv_data(): 
    dataChunk, addr = sock.recvfrom(buf) #THIS IS THE DATA THAT GETS WRITTEN 
    try: 
     w = threading.Thread(target = write_to_file, args = (dataChunk,)) 
     threads.append(w) 
     w.start() 
     while(dataChunk): 
      sock.settimeout(4) 
      dataChunk,addr = sock.recvfrom(buf) 
    except socket.timeout: 
     print "Timeout" 
     sock.close() 
     f.close() 

threads = [] 
r = threading.Thread(target=recv_data) 
threads.append(r) 
r.start() 

私は何か間違っていると思いますが、スレッドを使用する最良の方法は何か分かりません。今、私の問題は、スレッドを作成するときに引数を指定する必要がありますが、その引数の値が新しいデータチャンクを反映するように正しく変更されないということです。 while(dataChunk)ループ、私は繰り返しごとに新しいスレッドを作成していませんか?

また、これは価値があります。これは、個別の受信スレッドと書き込みスレッドを使用する小さな概念実証です。これは最終的にこのコンセプトを活用すべき大きなプログラムではありません。

答えて

1

読み取りスレッドが書き込みを行い、書き込みスレッドが読み取るバッファが必要です。 A deque from the collections moduleは完璧です。パフォーマンスの低下なしに両サイドからappend/popを許可するためです。

したがって、dataChunkをスレッドに渡すのではなく、バッファに渡してください。

import collections # for the buffer 
import time # to ease polling 
import threading 

def write_to_file(path, buffer, terminate_signal): 
    with open(path, 'wb') as out_file: # close file automatically on exit 
     while not terminate_signal.is_set() or buffer: # go on until end is signaled 
     try: 
      data = buffer.pop() # pop from RIGHT end of buffer 
     except IndexError: 
      time.sleep(0.5) # wait for new data 
     else: 
      out_file.write(data) # write a chunk 

def read_from_socket(sock, buffer, terminate_signal): 
    sock.settimeout(4) 
    try: 
     while True: 
     data, _ = sock.recvfrom(buf) 
     buffer.appendleft(data) # append to LEFT of buffer 
    except socket.timeout: 
     print "Timeout" 
     terminate_signal.set() # signal writer that we are done 
     sock.close() 

buffer = collections.deque() # buffer for reading/writing 
terminate_signal = threading.Event() # shared signal 
threads = [ 
    threading.Thread(target=read_from_socket, kwargs=dict(
    sock=sock, 
    buffer=buffer, 
    terminate_signal=terminate_signal 
)), 
    threading.Thread(target= write_to_file, kwargs=dict(
    path="t1.txt", 
    buffer=buffer, 
    terminate_signal=terminate_signal 
)) 
] 
for t in threads: # start both threads 
    t.start() 
for t in threads: # wait for both threads to finish 
    t.join() 
+0

2つのこと:1。私はあなたがどこか(おそらくそうのような各スレッドを開始する必要があると仮定しています 'スレッドでtについて:t.start()'と2.それはまだ動作していないと思われます今ではファイルは226バイトしかありません。また、最初のものではなく、最後の226バイトのデータが得られているようです。 – Swoldier

+0

@Swoldier両方のスレッドを開始する必要があります。 – MisterMiyagi

+0

@Swoldier上記のコードは実際には実行されておらず、例えば 'sock.recvfrom(buf)'に 'bug'が定義されていないのでできません。 – MisterMiyagi

関連する問題