2017-04-07 7 views

答えて

1

これはthe official documentation of aggregate(別名scaladoc)にここスパーク2.1.0(あまり重要ではないはずですが、...)

ゴーあると読み:

0123を

与えられた結合関数とニュートラル "ゼロ値"を使用して、各パーティションの要素を集計し、すべてのパーティションの結果を集計します。この関数は、このRDD、Tの型とは異なる結果型Uを返すことができます。したがって、scala.TraversableOnceのように、TをUにマージする1つの演算と、2つのUをマージする1つの演算が必要です。これらの関数の両方は、メモリ割り当てを避けるために新しいUを作成する代わりに、最初の引数を変更して返すことができます。次のように

署名は(特に面白くないとして暗黙パラメーターを除去)である:

aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

scaladocは言う:

zeroValueの累積結果の初期値seqOp演算子の各パーティションと、combOp演算子の異なるパーティションの結合結果の初期値も含まれます。これは通常、中立要素です(例:リスト結合の場合はNil、合計の場合は0)

あなたの場合、zeroValue(0, 0)です。

seqOpは、オペレータはseqOpが、私は呼びたい残念ながらxyという名前の2つのペアを、(受諾関数である(x, y) => (x._1 + y, x._2 + 1)で、あなたのケースでは、パーティション

以内に結果を蓄積するために使用p1p2は、少なくともパターンマッチングと部分的な機能を使用しても、つまりcase ((x1, y1), (x2, y2)) => ...です。あなたはnパーティション(あなたがrdd.getNumPartitionを使用して、それをチェックアウトすることができます)、seqOpを持って考えると

n回呼ばれようとしています。

scaladocは言う:

combOp結合演算子がcombOpseqOpのすべての結果を組み合わせて、関数を適用することを意味し、異なるパーティション

からの結果を組み合わせるために使用:

(x, y) => (x._1 + y._1, x._2 + y._2) 

あなたがあまりにも多くを見て、私も電話するだろうと書かれていますノイズ。私は次のように関数を記述したい:

{ case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) } 

は種類に従い、適切な名前を付け、そして最終的にはScalaではすべてがはるかに容易になり;-)

3

の結果を生成するの下に基本的集合体が言う方法の詳しい説明を与えることができます:私たちがしたいタプルを(a、b)がある場合すべての要素の合計であり、bはその数です。

これは、(0,0)に初期化することによって行われ、その後、我々は2つの機能を持っている:

タプルがある。すなわち、我々は、一度に単一の要素を取得するときに最初の関数は、単に加算を行い
  • 第1の要素に値を加算し、第2の要素に1(カウント)を加えることによって、単一の要素から更新される。それだけの要素が賢明加え

は、入力データの例を考えてみましょないよう

  • 第二の機能は二つの結果をマージします:

    は1,2は、パーティション1と3であると言うことができます図3は、パーティションにある3

    パーティション1計算

    パーティション1希望STA rtを(0,0)で置き換えます。

    次に、最初の関数が機能し始めます。

    これを追加すると(1,1)が得られます。最初の要素は合計(0 + y、yは1)で、2番目の要素はカウント(0 + 1)です。

    ここで、(1 + 2,1 + 1)=(3,2)となるように2を追加します。再び最初の要素はこれまでに見た値の合計であり、2番目の要素はその数です。

    パーティション2計算

    第二区画に、我々は再び(0,0)で開始し、その後、我々は第二から第3及び(6,2)から(3,1)を得ます。今、第二の機能は、2つをマージする場に出た結果

    をマージ

    : 我々は両方の要素を合計すると、(9,4)

    を取得することにより、マージ(3,2)と(6,2)
  • 関連する問題