2017-12-02 6 views
0

私は、オブジェクトのコレクションに対して高価なマップ操作を最小限に抑えることを含む最適化の問題に取り組んでいます。スパーク短絡、並べ替え、および遅延マップ

素朴なソリューションが

rdd.map(expensive).min() 

ようなものになるだろうしかし、マップ機能は> = 0だから、であることが保証値を返す任意の単一の結果が0であれば、私は答えとしてそれを取ることができ、残りの地図操作を計算する必要はありません。

Sparkを使用してこれを行うための慣用方法はありますか?

答えて

2

Sparkを使用してこれを行う慣用方法はありますか?

いいえ、このような低レベルの最適化に関心がある場合は、Sparkは最適な方法ではありません。完全に不可能というわけではありません。

あなたは、例えば、このような何か試すことができた場合:

rdd.cache() 
(min_value,) = rdd.filter(lambda x: x == 0).take(1) or [rdd.min()] 
rdd.unpersist() 

短絡パーティション:

def min_part(xs): 
    min_ = None 
    for x in xs: 
     min_ = min(x, min_) if min_ is not None else x 
     if x == 0: 
      return [0] 
    return [min_] in min_ is not None else [] 

rdd.mapPartitions(min_part).min() 

が通常必要、それぞれ与え、わずかに異なるパフォーマンスプロファイルよりも多くを実行しますが、スキップすることができます両方をいくつかのレコードを評価する。珍しいゼロで、最初の方が良いかもしれません。

アキュムレータのアップデートを聞くこともでき、sc.cancelJobGroupは0が表示された後に使用できます。同じようなアプローチの例がありますIs there a way to stream results to driver without waiting for all partitions to complete execution?

0

実際にが高価な場合は、「高価」の結果をSQL(またはすべてのワーカーが利用できる他のストレージ)に書き込むことができます。 次に、「高価」の先頭で現在格納されている番号をチェックし、ゼロの場合は高価な部分を実行せずに「高価」からゼロを返します。

あなたは時間を節約できますが、「グローバル」ではない作業者ごとに、このローカルを実行することもできます。

関連する問題