2017-11-03 11 views
0

リアルタイムデータストリーミングプロジェクトでは、N分ごとにデータを解析して保存しています。私の目的は、データの最初の分(バッファーとして)を捨てて、サーバーから4分ごとにデータを格納することです。データは、クラスター化および計算(関数はここには含まれていません)する他の関数に解析されます。N分ごとにmqtt pahoでリアルタイムストリーミングデータを保存および解析する方法(Python)

私は 'on_message'という関数で条件を初期化し、データをその関数内で解析しました。私は私の構造化と呼び出しが私の目的を達成する正しい方法だとは思わない。追加の詳細が必要な場合はお知らせください。

ON_MESSAGE

def on_message(r_c_client, userdata, message): 

    if (message.topic == "scanning"): 

    c = datetime.now().time() 
    current = (c.hour * 60 + c.minute) * 60 + c.second 

    time.sleep(60) #initial delay 

    data = json.loads(message.payload.decode("utf-8")) 
    x = data['host'] 
    y = data['data'] 

    hostList = store(x, y) 

    while (current>=total_Time): 
     #time.sleep(60) #initial delay 


     nodeList = listToDf(hostList) 


     nodeDf= df_reformat(nodeList) 
     print clustering_results_reformat(process_startTime, nodeDf) 

ストア機能

def store(host, data): 





    if host in hostList: 
     hostList[host].append(data) 

    else: 
     hostList[host] = [data] 

    return hostList 

メイン

global process_startTime 

t = datetime.now().time() 

process_startTime = (t.hour * 60 + t.minute) * 60 + t.second 

total_Time = process_startTime + 300 #4 minutes + 1 minute 

print t , process_startTime 

broker_address = '10.10.0.100' 
c_client = mqtt.Client("trilateration") 
c_client.on_connect = on_connect 


c_client.on_message = on_message 
c_client.on_subscribe = on_subscribe 


c_client.connect(broker_address, 1883) 

c_client.loop_forever() 

答えて

0

まずアップ、あなたはon_message機能で(スリープ)をブロックすることはありません、この機能は、すべてのために呼ばれていますあなたが眠ったら、システムはその時間の長さを待たなければなりません次のメッセージに移動します。

次に、on_message機能の外にある開始時間を追跡する必要がある場合は、現在の時間を各メッセージのこの値と比較し、保持するかどうかを決定できます。

def on_message(r_c_client, userdata, message): 
    global process_startTime 

    if (message.topic == "scanning"): 
    c = datetime.now().time() 
    current = (c.hour * 60 + c.minute) * 60 + c.second 

    if (current<=total_Time and current>=(process_startTime + 60)): 
    data = json.loads(message.payload.decode("utf-8")) 
    x = data['host'] 
    y = data['data'] 

    hostList = store(x, y) 

メインはこのようなものになります。私はわずか4分後に処理するための関数を呼び出すことができ

global process_startTime 

t = datetime.now().time() 

process_startTime = (t.hour * 60 + t.minute) * 60 + t.second 
total_Time = process_startTime + 300 #4 minutes + 1 minute 
print t , process_startTime 

broker_address = '10.10.0.100' 
c_client = mqtt.Client("trilateration") 
c_client.on_connect = on_connect 

c_client.on_message = on_message 
c_client.on_subscribe = on_subscribe 
c_client.connect(broker_address, 1883) 

while (True): 
    c_client.loop() 
    c = datetime.now().time() 
    current = (c.hour * 60 + c.minute) * 60 + c.second 
    if (current >= total_Time): 
    nodeList = listToDf(hostList) 
    nodeDf= df_reformat(nodeList) 
    print clustering_results_reformat(process_startTime, nodeDf) 
    time.sleep(1) 
+0

をデータが4分以上を渡す前に、あなたの状態は私の関数を呼び出します。 4分後に私の機能が呼び出され、4分おきにもう一度電話がかかります。 – jayen

+0

これはあなたの質問から明らかではなく、投稿したコードは同じことをします。 – hardillb

+0

私は答えを編集しましたが、それは100%正しいわけではありませんが、正しい方向に十分なポイントを与えるはずです。 – hardillb

関連する問題