2016-05-04 8 views
9

私はApache Sparkの学習者で、RDDアクションaggregateに出くわしました。どのように機能するのか分かりません。いくつかのいずれかが綴る、私たちは、コードについては、以下の結果に到達しなかったかのステップで、詳細ステップで説明することができ、ここでRDD Aggregate in spark

RDD input = {1,2,3,3} 

RDD Aggregate function : 

rdd.aggregate((0, 0)) 
((x, y) => 
(x._1 + y, x._2 + 1), 
(x, y) => 
(x._1 + y._1, x._2 + y._2)) 

output : {9,4} 

おかげ

答えて

18

あなたはどのようなことが起こっていることに従うのが最善であるかわからない場合タイプ。あなたはaggregateRDD[T]からUにマップする関数であることがわかりますすべての追加のパラメータを無視した場合は簡潔にするために、暗黙のClassTag省略我々はこの

abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

のようなもので始まります。つまり、入力値RDDの値の型が出力値の型と同じである必要はありません。だから、はっきり例reduceよりも異なります。

def reduce(func: (T, T) ⇒ T): T 

またはfold

def fold(zeroValue: T)(op: (T, T) => T): T 

foldと同じ、aggregatezeroValueが必要です。どのようにそれを選択するには?それはcombOpに関してアイデンティティ(ニュートラル)要素でなければなりません。

はまた、二つの機能を提供する必要があります。

(U, T)から (U, U)からわずかこれはあなたがすでに表示されるはず署名に基づい U

にマップU

  • combOpにマップ
    • seqOpseqOpのみが生データにアクセスすることができます。タイプUの値をとり、タイプTの別の値をとり、タイプUの値を返します。あなたのケースでは、それは次の署名を持つ関数です

      この時点では、何らかの折りたたみ操作に使用されていると思われます。

      第2の関数は、タイプUの2つの引数をとり、タイプUの値を返します。前に述べたように、元のデータには触れず、すでにseqOpによって処理された値に対してのみ操作できます。あなたの場合、この関数は次のような署名を持っています:

      ((Int, Int), (Int, Int)) => (Int, Int) 
      

      どうすればそれらを一緒に得ることができますか?

      1. まず各パーティションはrespectivelly zseqopcombopとして渡さzeroValueseqOpcombOpIterator.aggregate標準使用して集約されます。内部aggregateを上書きしない各パーティションから収集された単純なfoldLeft(zeroValue)(seqOp)

      2. 次に部分的な結果をcombOp

      を使用して集計されたように、それが実行されるべき使用InterruptibleIterator入力RDDは、以下を有する3つのパーティションを有していると仮定しますので値の分布:

      • Iterator(1, 2)
      • このように見ることができ、単一パーティションの

        val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1) 
        val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2) 
        
        Seq(Iterator(1, 2), Iterator(3, 3), Iterator()) 
            .map(_.foldLeft((0, 0))(seqOp)) 
            .reduce(combOp) 
        

        foldLeft

      • Iterator()

      あなたはこのようなものに相当するだろう、絶対順序を無視して、その実行を期待することができます

      Iterator(1, 2).foldLeft((0, 0))(seqOp) 
      Iterator(2).foldLeft((1, 1))(seqOp) 
      (3, 2) 
      

      およびすべてのパーティション

      これはあなたがすべてのあなたがニュートラル値、パーティションごとの値を処理するために使用される機能や関数を渡すスパークの上にあります一般的なパターンである。一般に

      (3 + 6 + 0, 2 + 2 + 0) 
      (9, 4) 
      

      :あなたを与える組み合わせ

      Seq((3,2), (6,2), (0,0)) 
      

      は、結果を観察しました異なるパーティションの部分集合をマージするために使用されます。他のいくつかの例としては、

      • aggregateByKey
      • ユーザー定義集計関数スパークDatasets
      • Aggregatorsを。ここで
  • 1

    あなたの参照のための私の理解です:

    次の2つのノードを持っている想像し、1は最初の2つのリストの要素{1,2}の入力を取り、別のは、{3,3}をとります。 (最初のxは、0 "(X、Y)=>(x._1 + Y、x._2 + 1)":第1のノードにおいて

    (ここでパーティションのみ便利のためのものです) (0 + 1、0 + 1)を出力し、次に2番目の要素y = 2と出力(1 + 2,1 + 1)を出力します。 (3,2)

    2番目のノードでは、同じ手順が並行して実行され、(6,2)となります。

    (x、y)=>(x._1 + y._1、x._2 + y._2)」は、2つのノードをマージするように指示します。 (0,0)で注目に値する


    一つのことは、実際に結果 長(RDD)1回に追加されます。

    "scala> rdd.aggregate((1,1))((x、y)=>(x._1 + y、x._2 + 1)、(x、y)=>(x。 (Int、Int)=(14,9) "