2017-02-21 5 views
0

私はfilterを使用した後、スパークRDD.isEmptyを使用しました。しかし、isEmptytake(1)のアクション操作であるため、大きなデータには時間がかかると思います。RDDのチェック方法は効率的ですか?

ここでは例のコードです。

val data = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))) 
.reduceByKey(_+_) 
.filter(_._2 > 5) 
//Array[(String, Int)] = Array() 

if(!data.isEmpty()){ 
    //running code... 
} 

データ値が空であるかどうかを確認する効率的な方法はありますか?

+0

なぜループ内でdata.count()を使用しないのですか? – CoDhEr

+0

大きなデータには適していないと思います。 –

+0

*しかし、大量のデータでは時間がかかると思います。*これは腸のような感じです。あなたのコードをベンチマークしましたか?これが実際のボトルネックですか? –

答えて

4

RDD.isEmptyが最も効率的です。可能な限りの作業を最小限に抑えます。

RDDはデータではなく、実行計画です。 RDDに実際にデータを評価せずにデータがあるかどうかを確認することはできないため、アクションを実行して結果を確認する必要があります。

最後のアクションの前にRDDが空であるかどうかを確認する必要がある場合は、persistを最初にキャッシュして、それ以降のジョブで再評価する必要はありません。

3

すでに述べたように、rdd.isEmptyはアクションです。if条件を削除するコードをリファクタリングしないと(命令型スタイルではなくより多くの機能スタイルを使用して)、非効率性を取り除くことはできません。既に示唆したように、最も簡単な解決策は、isEmptyを呼び出す前にrddをキャッシュすることです。これにより、十分なキャッシュメモリがあれば変換が一度だけ実行されます。

私が気付いていないように、 "if(data.isEmpty())"の唯一の提案は、rdd.map、rdd.foreachなどは空でも完全に有効ですRDD。 "if(data.isEmpty())"がソリューションに近づく一つの方法かもしれません。問題の詳細を伝えることができれば、機能的アプローチを提案することができます。

+0

返信いただきありがとうございます! 私は 'cache'をどのように使うことができますか? 'val data = sc.parallelize(Seq(" a "、1)、(" a "、2)、(" b "、2)、(" b "、3)、(" c "、1 ))))) .reduceByKey(_ + _) .filter(_._ 2> 5).cache'あなたは何を意味していますか? –

+0

それはまさに私が意味していたものです。 – rakesh

関連する問題