2017-06-22 7 views
1

背景非同期

をどのように処理すべきか、AzureのバッチでAzureのデータファクトリー活動を実行しているとき、私はややこのシナリオを単純化しているが、これは一般的な問題です。

私はAzureデータファクトリを使用して、カスタムAPIからAzureデータウェアハウスのテーブルにデータを取り込みます。 IDotNetActivityを使用してAPIを呼び出し、データウェアハウスにデータをロードするC#コードを実行しています。アクティビティはAzure Batchで実行されます。

アクティビティ自体の中で、カスタムAPIを呼び出す前に、私はAzure Blobストレージ内のファイルから人のリストを読み込みます。 次に、ファイル内の各ユーザーのカスタムAPIを呼び出します。 これらのコールは、順番に順番に行われます。 この方法は時間がかかりすぎるという問題があります。 ファイルサイズが拡大する可能性が高いため、時間がかかるだけです。私はパフォーマンスを改善しようとした

物事

  • APIは、非同期呼び出しを作ると、不思議なことに、これは遅く走っ3のバッチでそれらを呼び出します。バッチプロセスが非同期を処理しない/それをすべて待っているようです。
  • MoreLinqのバッチコマンドはまったく機能しませんでした。私はこのためのソースコードを確認した: https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs。これは利回りリターンを使用しますが、なぜこれが動作していないのか、それが非同期/待機の問題に関係しているのか分かりません。

主な質問

Azureのバッチ・サポート・非同期が/待っていますか?

さらに質問

  • アズールは非同期をサポートしていない場合は/この問題にアプローチするためのより良い方法は何かそれから待ちますか?ジョブマネージャを使用し、より多くのノードを回転させる。
  • Azure BatchでMoreLinqのバッチが動作しない理由について、誰かが気にすることはできますか?私の理解パー

    List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword); 
    var customResults = new List<CustomApiResult>(); 
    foreach (var personIdsBatch in personIds.Batch(100)) 
    { 
        customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch)); 
    } 
    

答えて

1

personIds.Batch(100)だけのバッチサイズ(100)バケットにpersonIds:ここ は、影響を受けるコードのスニペットです。 method1は、同じタスクを処理するための追加の論理を追加していながら

//method1 
foreach (var personIdsBatch in personIds.Batch(100)) 
{ 
    customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch)); 
} 

//method2 
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIds)); 

どちらの上記の方法は、それぞれの人の順にカスタムAPIへの呼び出しを行います。

アズールバッチは非同期/待機をサポートしていますか?また

public class MyDotNetActivity : IDotNetActivity 
{ 
    public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger) 
    { 
     return ExecuteAsync(linkedServices, datasets, activity, logger).Result; 
    } 

    async Task<IDictionary<string, string>> ExecuteAsync(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger) 
    { 
     List<int> personIds = await GetPersonIds("{clientAddress}", "{clientUsername}", "{clientPassword}"); 
     var tasks = new List<Task<List<CustomApiResult>>>(); 
     foreach (var personIdsBatch in personIds.Batch(100)) 
     { 
      tasks.AddRange(GetCustomResultsByBatch("{address}", "{username}", "{password}", "{personIdsBatch}")); 
     } 

     var taskResults = await Task.WhenAll(tasks); 
     List<CustomApiResult> customResults = taskResults.SelectMany(r=>r).ToList(); 

     //process the custom api results 

     return new Dictionary<string, string>(); 
    } 

    async Task<List<CustomApiResult>> GetCustomResultsByBatch(string address, string username, string password, IEnumerable<int> personIdsBatch) 
    { 
     //Get Custom Results By Batch 
     return new List<CustomApiResult>(); 
    } 

    async Task<List<int>> GetPersonIds(string clientAddress, string clientUsername, string clientPassword) 
    { 
     //load a list of people from a file in Azure Blob storage 
     return new List<int>(); 
    } 
} 

、私は並行して、あなたのsynchronisticalジョブを実行するために、次のようにあなたがParallel.ForEachを活用できることを前提と次のようにあなたのコードに

ベースは、私はあなたがそれを参照してください可能性があり、IDotNetActivity implementionを定義し:

List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword); 
var customResults = new List<CustomApiResult>(); 
Parallel.ForEach(personIds.Batch(100), 
new ParallelOptions() 
{ 
    MaxDegreeOfParallelism=5 
}, 
(personIdsBatch) => 
{ 
    var results = GetCustomResultsByBatch(address, username, password, personIdsBatch); 
    lock (customResults) 
    { 
     customResults.AddRange(results); 
    } 
}); 
+0

ありがとうブルース、これは動作します。 MoreLinq Batchが動作します。これは数ヶ月前には機能しませんでした。また、async/awaitコードも正常に動作します。なぜ以前にはうまくいかなかったのか分かりません。 –