2016-12-09 4 views
0

私はSpark 1.6.3、Scala 2.11.8を使用していました。スパーク1.6.3 rdd.foreachのブロードキャスト変数が多すぎる

私は、ブロードキャストをrdd.foreachで使用すると、信じられないほどの時間がかかりますが、実際には "FINISHED"には到着しませんでした。

val histDS = sc.textFile(args.head) 
    .map(_.split("\t")) 
    .filter(r => r(17).length > 0 && r(18).length > 0) 
    .map(r => HistoryRecord(r(22), r(17).toLong)) 


val cycle = sc.broadcast[Cycle](Cycle()) 

for (rec: HistoryRecord <- histDS) { 
// do something, cycle works as global variable 
} 

次多分それは約10分を実行し続けるよう

主なコードがあり、私は、プロセスを停止するが、私は唯一の次のコードのようにforループで値を印刷する場合、それが正常に動作します。

for (rec: HistoryRecord <- histDS) { 
println(rec) 
} 

次に、次のコードを使用して、rddを使用してみました。 collect()関数は、 "for"ループで使用される配列を取得します。

val histDS = sc.textFile(args.head) 
    .map(_.split("\t")) 
    .filter(r => r(17).length > 0 && r(18).length > 0) 
    .map(r => HistoryRecord(r(22), r(17).toLong)) 
    .collect() 

このコードは正常に実行され、約2分で終了します。

だから誰でも知っていますか? collect()は、rddを使用するのと比べてパフォーマンスに影響します。 foreach

答えて

0

はコレクト実行スパークScalaのAPIドキュメントで説明し、

を多くのメモリ資源の費用がかかります)(収集することができ、アプリケーションのドライバ・プロセスにすべてのデータを移動し、非常に大規模なデータセットでそうすることが必要ですOutOfMemoryErrorを使用してドライバプロセスをクラッシュさせます。変数を放送

、ない書き込みため、放送を好きアキュムレータは、書き込みのために使用することができる読むだけ状況のために使用するのが最適ですが、複雑なコンピューティングに合うことができません。

原則: 1. 使用マップ()RDDは、ワーカーノード上で実行されますので、その中に、我々は、ドライバの変数を取得することはできませんのでONLY READのために放送変数を使用し、より多くの機能 1.好き。

0

マップやフィルタのような変換は、アクションが実行できないときにのみ実行されます。

それぞれがスパークの変形または動作ではありません。 collectはアクションであり、これは計算結果をドライバに返すのに役立ちます。

上記の方法は、sparkによる遅延評価と呼ばれます。すべての変換は、アクションが呼び出されたときに実際に実行されます。これは

0
.Collect() 

を助け

希望はRDD CollectAPi

foreach(func) 

から結果を提供しているアクションがスパーク文書Sparkhome に応じてアクションの各要素に対して関数funcを実行されていますデータセットこれは通常、Accumulatorの更新や外部ストレージシステムとの対話などの副作用に対して行われます。 注:foreach()以外のAccumulators以外の変数を変更すると、未定義の動作が発生する可能性があります。 Foreachexample

関連する問題