2017-03-31 7 views
1

マップとして要素を持つRDDがあります。もちろん、RDD.getは使用できません。だから、今のように、私はこのマップのキーの値を取得するには、次の操作を行います。スカラのマップのrddから鍵の値を取得する

val x = RDD.collect().flatten.toMap 

、その後

x.get(key) 

のキーの値を取得します。今、rddに.collect()を適用しているので、エラーjava.lang.OutOfMemoryError: GC overhead limit exceededを出力する本当に大きなrddがあります。 .collect()をrddに適用せずにどうすればいいですか?あなたがに見る必要があるマップのRDDをフィルタリングするためにあなたのドライバ、あなたの最初の必要性にすべてのものに合わせて、それが本当にあるならば取得...

val rdd = sc.parallelize(List(Map("a"->1,"b"->2),Map("c"->3,"d"->4))) 

val key = "d" 

val filteredRDD = rdd.filter(_.keySet contains key) 

if (!filteredRDD.isEmpty) filteredRDD.first.get(key) else None 
+0

にフィルタだけフィルタにflatMapを回すことができますしたいはずです予想される出力を含む例? – mtoto

+0

私に飛びつくことの一つは、収集(「行動」)をあまりに早く呼び出すことです。あなたはあなたのRDDを(うまくいけば)小さなRDDに変換する必要があります - 基本的には、それらの要素を望むキーで欲しいだけです - あなたのRDDにわずかな要素しか持たない最後の時点でcollectを呼び出します。 – Phasmid

答えて

0

Map Sはその後、次の操作を実行できます。

rdd.flatMap(identity).lookup(key) 

これはまだ意志ドライバへの出力が、そのキーから値のみが、。だから、もしそれが記憶に収まるならば、あなたはそれでうまくいくのです。しかし、あなたはその後、まだRDDとしてそれで作業する場合:

rdd.flatMap(identity) 
    .flatMap{case (key, value) => if(key == myKey) Some(value) else None} 

そして、あなたはキーと値その後、あなたは、再現を共有することができkey == myKey

2

を行うことはできませんので

+0

私は 'ルックアップ 'について知らなかった、それは明らかにここで使うのに適している。しかし、2番目の 'flatMap'は' filter'のように見えます。 –

+0

Thanks @CyrilleCorpet私はflatMapを選んだ理由を明示するように答えを修正しましたが、場合によってはフィルタを与えました。 –

+0

@JustinPihony、上記の方法は動作しますが、次にrdd2の中で "rdd.flatMap(identity).lookup(key)"を使う必要があります。 rdd2の各要素について、 "rdd"でその値を調べなければなりません。スローされたエラーは、このRDDにはSparkContextがありませんでした。次の場合に発生する可能性があります。(1)RDD変換およびアクションがドライバによって呼び出されないが、他の変換の内部で実行される。例えば、rdd1.map(x => rdd2.values.count()* x)は、rdd1.map変換の内部で値変換とカウント動作を実行できないため無効です –

関連する問題