2017-06-08 13 views
0

私はこのようなタスクのMapています。各Map<Identifier, Progress> elementは、各タスクからの最新の要素が含まれていマップを結合するRxJavaコンビネータ?

Observable<Map<Identifier, Progress>> 

:私はこれを変換したい

Map<Identifier, Observable<Progress>> tasks; 

を。

私はすべてのタスクが完了し、タスクの任意の 1が故障したときに失敗している場合にのみ、完了するために、新しいObservableをしたいです。

これにコンビネータがありますか?私はよくあなたのアイデアを得る願っていcombineLatestオペレータ

Observable.combineLatest(List<Observable<Pair<Identifier, Progress>>>, 
    (List<Pair<Identifier, Progress>>) -> {merge into `Map`}) 

を使用し、一緒にこれらのタスクを組み合わせること

Observable.just(Identifier) 
    .flatMap(Observable<Progress> progressObservable, 
     (Identifier, Progress, Pair<Identifier, Progress>) -> {create pair of elements}) 

:シングルMap<Identifier, Observable<Progress>> task<Identifier, Progress>への使用を変換するために

答えて

1

。もちろん、疑似コードで書かれています。


実装:

public static <T, S> Observable<Map<T, S>> zipMaps(final Map<T, Observable<S>> tasks) { 
    Objects.requireNonNull(tasks, "tasks is null"); 
    return Observable.combineLatest(
     tasks.entrySet() 
      .stream() 
      .map(entry -> Observable.combineLatest(
       Observable.just(entry.getKey()), 
       entry.getValue(), 
       Pair::with)) 
      .collect(ImmutableList.toImmutableList()), 
     xs -> Arrays.stream(xs) 
      .map(x -> (Pair<T, S>)x) 
      .collect(ImmutableMap.toImmutableMap(Pair::getValue0, Pair::getValue1))); 
} 
+0

が動作しているようです。ヒントから実装を追加しました。 – sdgfsdh

関連する問題