2017-12-05 6 views
1

私は400 +の従業員オブジェクトを処理する必要があります。これには、それぞれに観測可能なサービスメソッドを呼び出す必要があります。RxJSを使用してクラッシュすることなく何百ものリクエストを処理するにはどうすればよいですか?

すべての従業員が処理された時期を知る必要があります。クライアントまたはサーバーに同時要求が多重化されない方法を見つける必要があります。現時点では何が起こっているのですか。

各従業員の処理結果を配列に集めて、そこからデータを処理することができます(エクスポートファイルを作成しています)。

私は forkjoin RxJSに連結をしようとしているが、私はまだタスクはすべて同時にオフ発射400+持っているようです。再び私のブラウザがハングし、ネットワークの数 - 処理が(ATMでのキューのように)、1つずつを扱っていることを示唆しているが、私は確信していない連結について

I read this pageリクエストは、私にはいつでも同じように示唆されています。

コードを取得して、一度に1人の従業員を処理し、リスト内の次の従業員を処理する前に終了することができれば、うまくいくでしょう。

processEmployees(): AsyncSubject<boolean> { 
    let employeesProcessed: AsyncSubject<boolean> = new AsyncSubject<boolean>(); 
    let listOfProcessesToRun = []; 

    this.employeeProfilesList.forEach(
     (employeeProfile: EmployeeProfile) => { 
      listOfProcessesToRun.push(
       this.annualLeaveCalculationService.startProcess(employeeProfile, this.yearToProcess).delay(2000) 
      ); 
     } 
    ) 

    Observable.concat(listOfProcessesToRun) 
     .finally(() => { 
     }) 
     .subscribe(
     (calculationResults: AnnualLeaveCalculationResults) => { 
      this.calculationResults.push(calculationResults); 
      employeesProcessed.complete(); 
     }, 
     error => { 
     }); 

    return employeesProcessed; 
} 

私は観測の配列を処理するためにObservable.forkjoinを使用してみましたが、私は要求を並列に処理されることを信じて、それが原因があることにぶら下がって私のマシンになりました約400名の従業員が処理する。

私はネストされた解決策を考えました。

processemployee(X) { 
    doServiceCall(this.employees[x]) 
    .subscribe(response => { 
      processemployee(++x); // with a check to detect number of loops 
     } 
    ); 
} 

しかし、それは素晴らしいパターンです。

答えて

4

.mergeMapオーバーロードを使用すると、concurrenyが使用され、サービスに何か(20req並行)のように設定されます。そうすれば、サーバーを揺らすことなく最大スループットを最適化できます。

+0

ああ、おかげで面白いと思う。私は今それを試み、私の結果を投稿します。とても有難い! –

+0

私は***の同時***値を1つ使用しなければなりませんでした。そうでなければ、外側の観測可能な出力が繰り返し表示されるなど、いくつかの奇妙な状況に遭遇しました。多分私はマージマップが何をしているのか理解しておく必要があります。さもなければ、並行パラメータの値が1で、それは私が望むのと同じように機能しました。 –

0

私は最近、これを正しく理解すれば、一種の類似の問題に踏み込んでいます。

私は何千もの並列処理要求を出しました。数字は高すぎて、全体が崩壊しました。

bufferCount()concatMap()の組み合わせを使用して、私が望むことを達成することができました。

  • bufferCount()分割チャンク内の元のストリーム、 を含むすべての要素
  • concatMap()の同じ数が引数として渡された関数 介して(並列に)最初のチャンクを詳述 - 機能は、観測を返す - 場合 観測可能な完了後、2番目のチャンクが取られるなど

多分、このアプローチが役立ちます。

関連する問題