2017-01-26 3 views
0

ストリームを取り込んで配列に格納するUDPサーバーを作成するこのスクリプトがあります。 1分ごとにデータを取り込み、クリーンアップして別のサーバーに送信します。これらの操作は、両方とも同じ変数を共有しています。マルチスレッドUDPサーバーでのデータ損失の回避

import socket, time, threading, copy 

UDP_IP = "255.255.255.255" 
UDP_PORT = 4032 

store = [] 

lock = threading.Lock() 

def receive_data(): 
    global queue 
    global lock 

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
    s.bind((UDP_IP, UDP_PORT)) 

    while True: 
     data = s.recv(9999) 
     # store data temporarily 
     lock.acquire() 
     store.append(data) 
     lock.release() 

def send_data(): 
    global store 
    global lock 

    lock.acquire() 
    data = copy.deepcopy(store) 
    store = [] 
    lock.release() 

    # Clean up, send and put a timer 
    threading.Timer(60, send_data).start() 

t1 = threading.Thread(target=receive_data, name='Server') 
t1.start() 

t2 = threading.Thread(target=send_data, name='Sender') 
t2.start() 

私の質問:これはデータ損失を避けるという点で十分なスクリプトですか?私は、変数をロックするとUDPサーバーを保留状態にしてアクセスし、その間に送信されたデータを何とかスキップしてしまう可能性があります。

+0

コードでは、UDPスレッドがロックを処理していることを示していません。それは全く待っていないようです。 – quamrana

+3

一般的なパターンは、スレッド間で共有されるキューを使用することです。キューはスレッドセーフなので、すべての送信関数は定期的に新しいデータがあるかどうかをチェックして送信する必要があります。これはどちらの関数でもロック機構が必要ないことを意味します –

+1

スレッドセーフの 'Queue'を使うべきでしょうか? https://docs.python.org/2/library/queue.htmlまたはhttps://docs.python.org/3.6/library/queue.html –

答えて

1

あなたのコードは次のようであると仮定すると:

import socket, time, threading, copy 

UDP_IP = "255.255.255.255" 
UDP_PORT = 4032 

store = [] 

lock = threading.Lock() 

def receive_data(): 
    global store 
    global lock 
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
    s.bind((UDP_IP, UDP_PORT)) 

    while True: 
     data = s.recv(9999) 
     # store data temporarily 
     lock.acquire() # Note the lock around access to global store 
     store.append(data) 
     lock.release() 

def send_data(): 
    global store 
    global lock 

    lock.acquire() 
    data = store[:] # cheap copy of the contents while locked 
    store = [] 
    lock.release() 

    # Expensive processing of data to send it to another server 
    process(data) 

    # Clean up, send and put a timer 
    threading.Timer(60, send_data).start() 

t1 = threading.Thread(target=receive_data, name='Server') 
t1.start() 

t2 = threading.Thread(target=send_data, name='Sender') 
t2.start() 

は、限りデータの読み取りが懸念している何のホールドアップはありません。とにかく、ソケットはデータをバッファリングします。

+0

答えをありがとう... – Daniyal

関連する問題