2016-07-07 24 views
5

サーバーからapiデータを取得する非同期メソッドがあります。このコードをローカルマシン上でコンソールアプリケーションで実行すると、高速で実行され、1分あたりのasync関数で数百回のhttpコールが実行されます。しかしAzure WebJobのキューメッセージから同じコードを起動すると、同期して動作しているように見えますが、私の数値クロールは簡単です。Azure WebJob関数で非同期を行う方法

キュー上のメッセージをリッスンし、メッセージに対する処理は、受信取得APIをキックオフ(1).. WebJob関数:

public class Functions 
    { 
     // This function will get triggered/executed when a new message is written 
     // on an Azure Queue called queue. 

     public static async Task ProcessQueueMessage ([QueueTrigger("myqueue")] string message, TextWriter log) 
     { 
      var getAPIData = new GetData(); 
      getAPIData.DoIt(message).Wait(); 
      log.WriteLine("*** done: " + message); 
     } 
    } 

外部アズール速度で非同期モードで動作すること(2)クラスあなたが期待すべき行動は、あなたのプロセスは、一度に1つのメッセージを実行し、処理することで、より多くのインスタンスが作成された場合はスケールアップします(アプリサービスの - ...

class GetData 
    { 
     // wrapper that is called by the message function trigger 
     public async Task DoIt(string MessageFile) 
     { 
      await CallAPI(MessageFile); 
     } 

     public async Task<string> CallAPI(string MessageFile) 
     { 
      /// create a list of sample APIs to call... 
      var apiCallList = new List<string>(); 
      apiCallList.Add("localhost/?q=1"); 
      apiCallList.Add("localhost/?q=2"); 
      apiCallList.Add("localhost/?q=3"); 
      apiCallList.Add("localhost/?q=4"); 
      apiCallList.Add("localhost/?q=5"); 

      // setup httpclient 
      HttpClient client = 
       new HttpClient() { MaxResponseContentBufferSize = 10000000 }; 
      var timeout = new TimeSpan(0, 5, 0); // 5 min timeout 
      client.Timeout = timeout; 

      // create a list of http api get Task... 
      IEnumerable<Task<string>> allResults = apiCallList.Select(str => ProcessURLPageAsync(str, client)); 
      // wait for them all to complete, then move on... 
      await Task.WhenAll(allResults); 

      return allResults.ToString(); 
     } 

     async Task<string> ProcessURLPageAsync(string APIAddressString, HttpClient client) 
     { 
      string page = ""; 
      HttpResponseMessage resX; 

      try 
      { 
       // set the address to call 
       Uri URL = new Uri(APIAddressString); 
       // execute the call 
       resX = await client.GetAsync(URL); 
       page = await resX.Content.ReadAsStringAsync(); 
       string rslt = page; 
       // do something with the api response data 
      } 
      catch (Exception ex) 
      { 
       // log error 
      } 
      return page; 
     } 

    } 
+1

「getAPIData.DoIt(メッセージ).Wait();」を変更しようとしましたか? "getAPIData.DoIt(message);を待つ"; –

+1

ありがとうございましたJason - あなたと他の人のコメントが私の仕事を助けました。とても有難い。 – qtime67

答えて

4

最初にトリガーされた機能がasyncであるため、.Wait()ではなくawaitを使用する必要があります。 Waitは現在のスレッドをブロックします。

public static async Task ProcessQueueMessage([QueueTrigger("myqueue")] string message, TextWriter log) 
{ 
    var getAPIData = new GetData(); 
    await getAPIData.DoIt(message); 
    log.WriteLine("*** done: " + message); 
} 

とにかく、あなたは別のキューでリスニング複数の機能を持っている場合、SDKは、それらを呼び出します

documentation

からパラレル実行便利な情報を見つけることができるでしょうメッセージが同時に受信されたときに並列になります。

単一のキューに対して複数のメッセージを受信した場合も同様です。デフォルトでは、SDKはを一度に16キューメッセージのバッチとし、それらを並列処理する機能を実行します。The batch size is configurable。処理される数値がバッチサイズの半分になると、SDKは別のバッチを取得し、それらのメッセージの処理を開始します。 したがって、関数ごとに処理される同時メッセージの最大数は、バッチサイズの1.5倍です。この制限は、QueueTrigger属性を持つ各関数に個別に適用されます。ここで

バッチサイズを設定するためのサンプルコードです:

var config = new JobHostConfiguration(); 
config.Queues.BatchSize = 50; 
var host = new JobHost(config); 
host.RunAndBlock(); 

はしかし、常に、同時に実行されている、あまりにも多くのスレッドを持っていると悪いパフォーマンスにつながる可能性が良いオプションではありません。

別のオプションは、あなたのwebjobをスケールアウトすることです:

複数のインスタンス

ウェブアプリが複数のインスタンス上で実行されている場合、連続WebJobは、各マシン上で実行され、各マシンは待ちますトリガーを実行し、機能を実行しようとします。 WebJobs SDKキュートリガは、関数がキューメッセージを複数回処理することを自動的に防止します。関数を冪等体に書く必要はありません。ただし、ホストWebアプリケーションのインスタンスが複数ある場合でも関数のインスタンスが1つしか実行されないようにするには、Singleton属性を使用します。

+0

ありがとうThomas - "待っているgetAPIData.DoIt(メッセージ);"私は今、非同期/並行性の2つの本を購入して、それらをよりよく理解するようになりました! – qtime67

+0

@トーマス私は、Webjobsドキュメントの "パラレル実行"と "複数インスタンス"引用符は、関数が非同期であるかどうかにかかわらず適用されると考えています。これらの引用符は、並列処理の利点を前進させるために、メソッドが非同期でなければならないという印象を与えるかもしれません。 – Matt

+0

@ Matt、あなたは正しい – Thomas

2

はこのWebjobs SDK documentationの読み取りを持っています)。複数のキューがある場合は、並行してトリガーされます。

パフォーマンスを向上させるには、「送ったリンク」の「設定の設定」セクションを参照してください。これは、バッチでトリガーできるメッセージの数を示します。

複数のメッセージを並列に処理したいが、インスタンスのスケーリングに頼りたくない場合は、代わりにスレッドを使用する必要があります(asyncはマルチスレッド並列処理ではなく、あなたが使っているスレッド)。したがって、キュートリガ関数はキューからメッセージを読み込み、スレッドを作成し、そのスレッドを「実行して忘れて」、トリガ関数から戻る必要があります。これにより、メッセージは処理済としてマークされ、理論上は以前のメッセージを処理していても、キュー上の次のメッセージを処理できます。エラー処理のための独自のロジックを組み込む必要があります。スレッドが例外をスローした場合やメッセージを処理できない場合(例えばポイズンキューに置くなど)にデータが失われないようにする必要があります。

[queuetrigger]属性を使用しないで、Azureストレージキューsdk API関数を直接使用して、要件ごとにメッセージを接続して処理することもできます。

+0

多くの方に感謝ラッセル - これは私にもっと知ってもらうように案内してくれました。とても有難い。 – qtime67

関連する問題