2017-03-17 7 views
1

と背景の非同期ジョブの先頭に同期要求をシミュレート私が最初に私のシステムのアーキテクチャを説明し、その後の質問に移動します:フラスコ

私は私のAPIゲートウェイとして使用されているREST APIを持っています。このサーバーは、Flaskを使用してビルドされています。私もRabbitMQクラスタを持っていて、私が書いたクライアントは特定のキューを待ち受けて、そのタスクを実行します。

これまでのところ、私のリクエストはすべて非同期であったため、リクエストがAPIゲートウェイに届くと、リクエストの一部として結果がPOSTされるURLを持つフィールドがcallback_uriになりました。タスクをRabbitMQに送信し、ワーカーはそのタスクを処理し、最後に結果をコールバックURLにPOSTしました。

私の質問は:

私は新しいエンドポイントは、処理は前と同じ作業者がまだ行われること、の意味で同期したいが、私は戻って結果を取得したいですAPIゲートウェイはユーザーに戻り、接続を解放します。

私の現在のソリューション:

私は以前のように労働者への要求の一部としてユニークなcallback_uriを送るんだけど、今特定のエンドポイントは、POSTとGETメソッドの両方を許可する私のAPIゲートウェイによって実装されています結果が利用可能になるまでワーカーはコールバックURLをポーリングし続け、結果をクライアントに返します。

ビジー状態のHTTPワーカーが結果を取得するために独自のエンドポイントをポーリングする以外の方法がありますか?しかし、結果が利用可能になったときにのみ接続が解放されるように、同期していますか?例示のみのため

コード:

@app.route('/long_task', methods=['POST']) 
@sync_request 
def long_task(): 
    try: 
     if request.get_json() is None: 
      return ERROR_MSG_NO_JSON, 400 
     create_and_send_request_to_rabbitmq() 
     return '', 200 
    except Exception as ex: 
     return ERROR_MSG_NO_DATA, 400 


def sync_request(func): 

    def call(*args, **kwargs): 
     create_callback_uri() 
     result = func(*args, **kwargs) 
     status_code = result[1] 
     if status_code == 200: 
      result = get_callback_result() 
     return result 

    return call 

def get_callback_result(): 
    callback_uri = request.get_json()['callback_uri'] 
    has_answer = False 
    headers = {'content-type': 'application/json'} 
    empty_response = {} 
    content = json.dumps(empty_response) 

    try: 
     with Timeout(seconds=SYNC_REQUEST_TIMEOUT_SECONDS): 
      while not has_answer: 
       response = requests.get(callback_uri, headers=headers) 
       if response.status_code == 200: 
        has_answer = True 
        content = response.content 
       else: 
        time.sleep(0.2) 
    except TimeoutException: 
     log.debug('Timed out on sync request for request %s ' % request) 

    return content, 200 

答えて

2

私が正しくあなたを理解すれば、あなたのバックエンドが(RabbitMQの経由)いくつかの労働者からの応答を待つようにしたいです。 rpc over rabbitmqを実装することでそれを達成できます。重要なアイデアは、相関IDを使用することです。

しかし、最も効率的な方法は、WebSocket(またはブラウザでない場合は生のTCPソケット)でクライアントを実行し、ジョブが完了したときに直接通知することです。そうすれば、リソース(クライアント接続、rabbitmqキュー)をロックせず、パフォーマンスヒット(rpc)を回避できます。

+0

tcpソケットソリューションに関して、バックエンドは通知するタイミングと通知する内容をどのように知っていますか? RPCソリューションはtcpソケットソリューションと一緒に使用する必要がありますか? –

+0

@AvihooMamka RPCソリューションは、古典的なHTTPで実装できます。 RabbitMQからの応答を待つために 'long_task'を修正するだけです。 TCPの場合:バックエンドは通知するタイミングを知らない。しかし、労働者は知っている。そこで、彼はバックエンドに通知します(これは古典的なHTTP要求を介して行うことができ、バックエンドを知る必要があるため、RabbitMQを介して追加情報を送信する必要があります)、バックエンドはクライアントに通知します(バックエンドは接続されたすべてのクライアントを追跡します) 。実装するには非常に難しいかもしれませんが、エッジケース(ランダムな切断など)について覚えておく必要があります。 – freakish

+0

クライアントがHTTP POST要求を送信して、応答が戻ってくるまで待たなければならないことがどのように機能するのか分かりません。同様に、リクエストを投稿し、結果と同じ接続を待っているクライアントのHTTP接続に接続するには、提案されたソリューションをどのように使用しますか? –