リアルタイムデータストリーミングプロジェクトでは、N分ごとにデータを解析して保存しています。私の目的は、データの最初の分(バッファーとして)を捨てて、サーバーから4分ごとにデータを格納することです。データは、クラスター化および計算(関数はここには含まれていません)する他の関数に解析されます。N分ごとにmqtt pahoでリアルタイムストリーミングデータを保存および解析する方法(Python)
私は 'on_message'という関数で条件を初期化し、データをその関数内で解析しました。私は私の構造化と呼び出しが私の目的を達成する正しい方法だとは思わない。追加の詳細が必要な場合はお知らせください。
ON_MESSAGE
def on_message(r_c_client, userdata, message):
if (message.topic == "scanning"):
c = datetime.now().time()
current = (c.hour * 60 + c.minute) * 60 + c.second
time.sleep(60) #initial delay
data = json.loads(message.payload.decode("utf-8"))
x = data['host']
y = data['data']
hostList = store(x, y)
while (current>=total_Time):
#time.sleep(60) #initial delay
nodeList = listToDf(hostList)
nodeDf= df_reformat(nodeList)
print clustering_results_reformat(process_startTime, nodeDf)
ストア機能
def store(host, data):
if host in hostList:
hostList[host].append(data)
else:
hostList[host] = [data]
return hostList
メイン
global process_startTime
t = datetime.now().time()
process_startTime = (t.hour * 60 + t.minute) * 60 + t.second
total_Time = process_startTime + 300 #4 minutes + 1 minute
print t , process_startTime
broker_address = '10.10.0.100'
c_client = mqtt.Client("trilateration")
c_client.on_connect = on_connect
c_client.on_message = on_message
c_client.on_subscribe = on_subscribe
c_client.connect(broker_address, 1883)
c_client.loop_forever()
をデータが4分以上を渡す前に、あなたの状態は私の関数を呼び出します。 4分後に私の機能が呼び出され、4分おきにもう一度電話がかかります。 – jayen
これはあなたの質問から明らかではなく、投稿したコードは同じことをします。 – hardillb
私は答えを編集しましたが、それは100%正しいわけではありませんが、正しい方向に十分なポイントを与えるはずです。 – hardillb