2017-10-03 17 views
0

複雑なタスクを処理するために非同期rxjavaフローを使用しようとしています。RXjavaの複雑な非同期タスク

メトリクスのリストと都市のリストを取得し、その値(日付と値のリスト)を取得します。各メトリックは異なるデータベースに格納されるため、多くのHTTPコールが必要です。すべてのメトリックおよび非同期オーバー

実行:私がしたいよう

結果の順序が何を意味します。 すべてのメトリックについて、すべての都市を非同期で実行したいと考えています。 各都市とメトリックについて、データを取得するためのHTTP呼び出しを作成したいとします。

終了応答は次のようになります。

"result": { 
    "metric1": [ 
    { 
     "cityId": 8, 
     "values": [ 
     { 
      "date": "2017-09-26T10:49:49", 
      "value": 445 
     }, 
     { 
      "date": "2017-09-27T10:49:49", 
      "value": 341 
     } 
     ] 
    }, 
    { 
     "cityId": 9, 
     "values": [ 
     { 
      "date": "2017-09-26T10:49:49", 
      "value": 445 
     }, 
     { 
      "date": "2017-09-27T10:49:49", 
      "value": 341 
     } 
     ] 
    } 
    ], 
    "metric2": [ 
    { 
     "cityId": 8, 
     "values": [ 
     { 
      "date": "2017-09-26T10:49:49", 
      "value": 445 
     }, 
     { 
      "date": "2017-09-27T10:49:49", 
      "value": 341 
     } 
     ] 
    }, 
    { 
     "cityId": 9, 
     "values": [ 
     { 
      "date": "2017-09-26T10:49:49", 
      "value": 445 
     }, 
     { 
      "date": "2017-09-27T10:49:49", 
      "value": 341 
     } 
     ] 
    } 
    ] 
} 
    } 
} 

私はこれをどのように達成することができますか?すべての観測で失われました。本当に価値があるはずのブロックはどれですか?

EDIT:追加のサンプルコード ここでは、私がこれまで持っているものです。

1. for each metric (parallel I guess): 


     Observable.from(request.getMetrics()) 
          .subscribe(metric -> { 
           List metricDataList = getMetricData(metric, citiesList); 
           result.put(metric.getName(), metricDataList); 
          }); 

       return ImmutableMap.of("result", result); 


    2. Get metric data (according to cities): 

     final List<Map<String,Object>> result = new ArrayList<>(); 
       Observable.from(citiesList).subscribe(city -> 
         result.add(ImmutableMap.of(
           "id", city.getId(), 
           "name", city.getName(), 
           "value", getMetricValues(metricType, city.getId()) 
            .toBlocking() 
            .firstOrDefault(new ArrayList<>())))); 

       return result; 

3. According to metric I decide which service to invoke: 
private Observable<List<AggregatedObject>> getMetricValues(AggregationMetricType metric, Integer cityId) { 

     switch (metric) { 
      case METRIC_1:  return fetchMetric1(city); 
      case METRIC_2:  return fetchMetric2(city); 
     } 

     return Observable.empty(); 
    } 


4. Invoke the service: 

public Observable<List<AggregatedObject>> fetchMetric1(Integer city) { 
     return Observable.just(httpClient.getData(city) 
       .map(this::transformCountResult) 
       .onErrorResumeNext(err -> Observable.from(new ArrayList<>())); 
    } 

5. Transform the received data: 
protected List<AggregatedObject> transformCountResult(JsonNode jsonNode) { 
     ... 
    } 

私は、ブロッキングと同時実行性について正しく実装されていることを確認していないけれどもそれは、働いています。

お願いします。

よろしく、 イド

+0

紛失した箇所を表示する必要があります。たとえば、1つの都市に対して1つのメトリックを実行できますか? –

+0

ありがとうございました..質問にコードサンプルを追加しました。 –

答えて

0

あなたは「2)メトリックデータを取得する」と呼ぶのステップで、あなたが処理のtoBlocking()外を移動

final List<Map<String,Object>> result = new ArrayList<>(); 
    Observable.from(citiesList) 
    .flatMap(city -> getMetricValues(metricType, city.getId()) 
         .firstOrDefault(new ArrayList<>()) 
         .map(metric -> ImmutableMap.of(
             "id", city.getId(), 
             "name", city.getName(), 
             "value", metric))) 
    .toList() 
    .toBlocking() 
    .subscribe(resultList -> result = resultList); 

としてリファクタリングができ、個々の要求がで発生することを意味しますよりパラレルなファッション。