2017-05-31 2 views
1

私はspark javaプログラムを持っています。ここでは、mapValuesステップを持つgroupByKeyが実行され、すべての入力rdd値のIterableという値を持つPairRDDが返されます。 groupByKeyの代わりにreduceByKeyをmapValuesに置き換えるとパフォーマンスが向上しますが、ここで問題にreduceByKeyを適用する方法はわかりません。spark javaでIterable値として返すためにreduceByKeyとgroupByKeyを置き換える方法は?

具体的には、タイプTuple5の値を持つ入力ペアRDDがあります。 groupByKeyとmapValuesの変換の後、値が入力値の反復可能である必要があるキーと値のペアRDDを取得する必要があります。

JavaPairRDD<Long,Tuple5<...>> inputRDD; 
... 
... 
... 
JavaPairRDD<Long, Iterable<Tuple5<...>>> groupedRDD = inputRDD 
    .groupByKey() 
    .mapValues(
      new Function<Iterable<Tuple5<...>>,Iterable<Tuple5<...>>>() { 

       @Override 
       public Iterable<Tuple5<...>> call(
         Iterable<Tuple5<...>> v1) 
         throws Exception { 

        /* 
        Some steps here..        
        */ 

        return mappedValue; 
       } 
      }); 

reduceByKeyを使用して上記の変換を行う方法はありますか?

+0

「ここのいくつかのステップ」とは何ですか?あなたはそれを減らすための論理が必要になります。 – philantrovert

+0

'mapValues'関数では、実際には' Tuple5'内のキーに基づいて各値をソートしています。私はそれがここでは関係ないと思ったので、私はそれらを含めなかったのです。 – Vishnu

+0

_私は、groupByKeyの代わりにreduceByKeyをmapValuesに置き換えると、パフォーマンスが向上しますが、間違って読んだことを読んでいます。 – zero323

答えて

1

私はSparkでScalaを使用していましたので、これはあなたが好むかもしれない正確な答えにはなりません。 groupByKey/mapValuesreduceByKey間符号化の主な違いは、このarticleから適合簡単な例を用いて見ることができる。

val words = Array("one", "two", "two", "three", "three", "three") 
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) 

val wordCountsWithGroup = wordPairsRDD. 
    groupByKey. 
    mapValues(_.sum) 
wordCountsWithGroup.collect 
res1: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) 

val wordCountsWithReduce = wordPairsRDD. 
    reduceByKey(_ + _) 
wordCountsWithReduce.collect 
res2: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) 

この例では、x => x.sum(すなわち_.sum)をmapValuesで使用される場合、それは(acc, x) => acc + xだろう(すなわち_ + _)reduceByKey。関数のシグネチャは大きく異なります。 mapValuesでは、グループ化された値のコレクションを処理していますが、reduceByKeyでは削減を行っています。

+0

私は理解しているように、RDDのグループ化リストを得るために、 'reduceByKey'はsumのような集計演算を意味するため、常に' groupKey'を使う必要があります。したがって、私の場合、 'reduceByKey'は正しくできませんか? – Vishnu

+0

質問のコメントセクションで説明を読み直した後、 'groupByKey'がおそらく行く方法だと思います。私は、削減がタスクの正しいツールだとは思わないからです。 –

関連する問題