2017-03-09 14 views
0

私はPairRDD rdd1に整数キーと整数[]の値を持っています。spark - 別のRDDの変換で(Java)PairRDDのキーと値を調べる方法

また、整数キーとDouble値を持つPairRDD rdd2もあります。

キー内の各整数AND rdd1の値は、キーとしてrdd2にも存在します。

私はxのdouble値と各整数y1y2、...、ynのすべてのdouble値を取得するためにrdd1の各ペア(x, [y1,y2,...,yn])のためにしたいです。

Map<Integer,Double>map2)として収集しようとしましたが、メモリに収まらず、OOMエラーが発生します。また、rddに参加しようとしましたが、キーと値の両方を結合する方法を理解できませんでした。 rdd2lookup()メソッドをrdd1の内部で使用することはできません。 yの各yiため

map each (int x, int[] y) in rdd1 to: 
     (x, map2.get(x) + sum(map2.get(yi))) 

私が何をしたいの擬似コードは次のようです。

私はJavaを使用していますが、JavaとScalaの両方で同じ問題が発生していると思います。

答えて

1

一致が見つからない場合(rdd1にインデックスがあり、対応するインデックスがrdd2にない場合)に応じて、クエリは次のようになります。これについて

rdd1. 
    // (x, [ y1, ..., yn ]) -> (x, x), (y1, x), ..., (yn, x) 
    flatMap { case (x, ys) => (x :: ys).map((_, x)) }. 
    // (xory, x) -> (xory, (x, rdd2.lookup(xory))) 
    leftOuterJoin(rdd2). 
    // (xory, (x, rdd2.lookup(xory))) -> (x, rdd2.lookup(xory)) 
    map(_._2). 
    // (x, rdd2.lookup(x)), ... -> (x, rdd2.lookup(x) + sum_i(rdd2.lookup(y_i)) 
    reduceByKey{ case (dopt1, dopt2) => (dopt1 ++ dopt2).reduceOption(_ + _) }. 
    // unwrap the option types 
    mapValues(_.getOrElse(0.0)) 
-1
HashMap<Integer, List<Integer>> map = new HashMap<>(); 
    map.put(1,asList(2,3)); 
    map.put(3,asList(4,5)); 

    System.out.println(
      map.entrySet().stream() 
        .flatMap(kv -> 
          Stream.concat(
            Stream.of((double)kv.getKey()), 
            kv.getValue().stream().mapToDouble(x -> Double.valueOf((double)x)).boxed()) 
        ) 
        .collect(Collectors.toList()) 
      ); 

どのように? ... 2つ目のRDDでキーとして使用できる1つのRDDにすべての(キーと値)を与える必要があります。もちろん、タイプを変更することはできます。

関連する問題