2017-07-26 11 views
1

私は私を驚きだスパークジョブのエラーを取得しています:私の仕事は、このようなものですどのように減らす前に大きな中間結果を避けるために?

Total size of serialized results of 102 tasks (1029.6 MB) is 
bigger than spark.driver.maxResultSize (1024.0 MB) 

def add(a,b): return a+b 
sums = rdd.mapPartitions(func).reduce(add) 

RDDは〜500パーティションおよびfuncがそのパーティション内の行を取りましたし、大きな配列を返します(1.3Mの倍数の配列、つまり〜10Mb)。 私はこれらの結果を合計して合計を返したいと思います。

スパークは、mapPartitions(func)の合計結果をインクリメンタルに処理するのではなく、約30MBしか必要としないメモリ(約5GB)で保持しているようです。

spark.driver.maxResultSizeを増やす代わりに、reduceを徐々に実行する方法はありますか?


アップデート:実際には、2つ以上の結果がメモリに保持されていることにもっと驚いています。

答えて

3

特に驚くべきことはありません。 reduceを使用する場合、Sparkはドライバの最終的な削減を適用します。あなたはtreeReduceを使用することができ

reduce(add, rdd.collect()) 

import math 

# Keep maximum possible depth 
rdd.treeReduce(add, depth=math.log2(rdd.getNumPartitions())) 

またはtoLocalIterator

sum(rdd.toLocalIterator()) 

前者は、再帰的にパーティションをマージしますfuncは、単一のオブジェクトを返す場合、これはに効果的に同等です労働者は、ネットワーク交換の増加を犠牲にしている。 depthパラメータを使用してパフォーマンスを調整できます。

後者の場合は、その時点で1つのパーティションしか収集されませんが、rddの再評価が必要な場合があり、ドライバの作業のかなりの部分が実行されます。

BlockMatrices

を使用して、あなたはまた、例えば、ブロックに行列を分割し、副ブロック付加を行うことにより、作業の分布を改善することができる funcで使用される正確なロジックに依存
関連する問題