、AWSにFlaskアプリケーションを50回並行してデプロイするにはどうすればよいですか?
import sys
import requests
import logging
import random
from datetime import datetime
import threading
import os
import time
logger = logging.getLogger('Intrudx')
handle = logging.FileHandler('Intrudx.log')
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handle.setFormatter(formatter)
logger.addHandler(handle)
logger.setLevel(logging.INFO)
loop_count = int(sys.argv[1])
sleep_time = int(sys.argv[2])
# CHECKING THE HEARTBEAT
def heartbeat(SessionID, SiteID):
logger.info("Starting heartbeat thread")
try:
heart_url = 'http://ec2-instance-address.com/license/heartbeat'
heart_result = requests.post(heart_url, json={
"SessionID":str(SessionID),
"SiteID" : str(SiteID)
})
if heart_result.status_code is 500:
logger.error("Heartbeat Failed with 500")
return "We Got 500"
response_text = heart_result.json()["ResponseText"]
logger.info("Heartbeat: "+str(response_text))
except Exception as e:
logger.error("Heartbeat Failed"+str(e))
# FINDING THE SERVER IP
def ip(SessionID):
logger.info("Starting get server info thread")
try:
get_server_url = 'http://ec2-instance-address.com/server/getStreamingServer'
get_server_result = requests.post(get_server_url, json={"SessionID": str(SessionID)})
result_code = get_server_result.status_code
if result_code is 500:
logger.error("GetStreamingServerInfo: " + "Failed")
return "We Got 500"
response_text = get_server_result.json()["ResponseText"]
logger.info("GetStreamingServerInfo: " + str(response_text))
except Exception as e:
logger.error("GetStreamingServerInfo: " + str(e))
def main():
for i in range(loop_count):
# LOGIN
try:
login_url = 'http://ec2-instance-address.com/user/login'
login_result = requests.post(login_url, json={
"AccountName": "Account1",
"UserID": "user2",
"UserPassword": "test"
})
result_code = login_result.status_code
if result_code is 500:
logger.error("Login: "+"Failed")
return "We Got 500"
SessionID = login_result.json()["SessionID"]
response_text = login_result.json()["ResponseText"]
logger.info("Login: "+str(response_text)+": "+ str(SessionID))
print(str(SessionID)+str(response_text))
except Exception as e:
result_code = str(e)
logger.error("Login: "+str(e))
# GET NEW SITE
try:
get_new_site_url = 'http://ec2-instance-address.com/license/getNewSite'
get_new_site_result = requests.post(get_new_site_url, json={"SessionID": str(SessionID)})
result_code = get_new_site_result.status_code
if result_code is 500:
logger.error("Login: " + "Failed")
return "We Got 500"
response_text = get_new_site_result.json()["ResponseText"]
site_id = get_new_site_result.json()["NewSiteID"]
logger.info("getNewSite: "+str(response_text)+": "+str(site_id))
except Exception as e:
result_code = str(e)
logger.error("getNewSite"+str(e))
# STARTING HEARTBEAT THREAD
try:
threading.Thread(target=heartbeat(SessionID, site_id), args=(SessionID, site_id,)).start()
except Exception as e:
logger.error("Problem starting thread: "+str(e))
# STARTING GET SERVER INFO THREAD
try:
threading.Thread(target=ip(SessionID), args=(SessionID)).start()
except Exception as e:
logger.error("Problem while starting Get Server Info Thread"+str(e))
は、このスクリプトは、API呼び出しを行うために、サーバーとの一つのセッション/接続を作成し、ただ一人のユーザーを使用しています。
同様の方法で、API呼び出しを行うサーバーに接続された50人または100人の異なるユーザー(異なるアカウントと資格情報を持つユーザー)でアプリケーションをテストしたいとします。 50人または100人のユーザーが同時にアプリケーションを使用しています。したがって、アプリケーションが50人のユーザーを適切に処理していることを確認できます。
スクリプトを使用してこのようなテストを行うにはどうすればよいですか?
更新:ほとんどのルートは非表示で、@login_requiredが必要です。
ほとんどすべてのAPIルート(ログインとレジスタを除く)は非表示で、@login_requiredが必要です。どのようにそれらのAPIを打つだろうか? – LifelongNoob
ありがとう、仲間。それは私にアイデアを与え、私は自分の小さな図書館を書いた。 – LifelongNoob
ようこそ。あなたが成功してうれしいです。 – Taterhead