2017-11-01 5 views
1

Map<K, Observable<V>>を実行してMap<K, V>に変換するより洗練された方法がありますか?オブザーバブルマップの実行

@Test 
public void test() { 
    final ImmutableMap<String, Observable<Integer>> map = ImmutableMap.of(
     "1", Observable.just(1), 
     "2", Observable.just(2), 
     "3", Observable.just(3) 
    ); 

    Map<String, Integer> result = new HashMap<>(map.size()); 

    final Integer execRes = map.entrySet() 
     .stream() 
     .map(entry -> { 
      entry.getValue().subscribe(res -> result.put(entry.getKey(), res)); 
      return entry.getValue(); 
     }) 
     .reduce(Observable::concat).get().toBlocking().last(); 

    Assert.assertTrue(execRes == 3); 
    Assert.assertTrue(1 == result.get("1")); 
    Assert.assertEquals("{1=1, 2=2, 3=3}", result.toString()); 
} 

P.S.:

私は、次の方法を見つけましたrxjava-1.1.7が使用され、Observableコードを同時に(同時に)実行する必要があります

+0

[ 'toMap'](http://reactivex.io/RxJava/javadoc/io/reactivexでおそらく見て/Observable.html#toMap-io.reactivex.functions.Function-io.reactivex.functions.Function-)、そのオーバーロードが役立つ可能性があります。 – nullpointer

答えて

1

どうやってですか? Rxのを使用して同じ

Map<String, Integer> result = map.entrySet().stream() 
     .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toBlocking().first())); 

:Javaの8つのストリームの使用

Map<String, Integer> result = Observable.from(map.entrySet()) 
     .toMap(Map.Entry::getKey, a -> a.getValue().toBlocking().first()).toBlocking().first(); 
+0

Observablesがここで順番に実行されるのを見て、それは良くありません。使用されているスレッド設定に従って並列に実行する必要があります。 – yetanothercoder

+0

「スレッド設定」とは何ですか?提供したコードは単一のスレッドで実行されます。 – Malt

+0

あなたは正しいですが、 'Observable ... subscribeOn(Schedulers ...')を使用して複数のスレッドでコードをチェックし、それも順番に実行します、興味深いことに私のコードは並列で実行されますが、あなたの答えを解決策として受け入れることは正しいと思いますが、このケースでは並列性について別の質問を作成することがあります。 ありがとう! – yetanothercoder

0

を私はあなたがGuavaを使用していたので、我々はGuavaにいくつかのconvinent方法を使用することができます。

値が単一でない場合でも、Multimapに変換することができます。ここで

は私のコードです:

import java.util.Map; 

import com.google.common.collect.ArrayListMultimap; 
import com.google.common.collect.ImmutableMap; 
import com.google.common.collect.Maps; 
import com.google.common.collect.Multimap; 

import rx.Observable; 

public class Q47057374 { 
    public static void main(String[] args) { 
    final ImmutableMap<String, Observable<Integer>> map = ImmutableMap.of(
     "1", Observable.just(1), 
     "2", Observable.just(2), 
     "3", Observable.just(3)); 
    System.out.println(toMap(map)); 
    final ImmutableMap<String, Observable<Integer>> multimap = ImmutableMap.of(
     "1", Observable.just(1, 2, 3), 
     "2", Observable.just(4, 5, 6), 
     "3", Observable.just(7, 8, 9)); 
    System.out.println(toMutimap(multimap)); 
    } 

    public static <K, V> Map<K, V> toMap(Map<K, Observable<V>> map) { 
    return Maps.transformValues(map, o -> o.toSingle().toBlocking().value()); 
    } 

    public static <K, V> Multimap<K, V> toMutimap(Map<K, Observable<V>> map) { 
    ArrayListMultimap<K, V> multimap = ArrayListMultimap.create(); 
    map.forEach((k, vo) -> vo.forEach(v -> multimap.put(k, v))); 
    return multimap; 
    } 
} 

そして出力:

{1=1, 2=2, 3=3} 
{1=[1, 2, 3], 2=[4, 5, 6], 3=[7, 8, 9]}