2016-08-24 7 views
3

MapReduceで以下の実装があり、正常に動作していますが、今はFlatMapFunctionを使ってSparkに移植しようとしていますが、この関数はメモリエラーをスローします。 のMapReduce:Sparkフラットマップ関数が "OutOfMemory"をスローしています

String[] hexList = input.toString().split(","); 
    int numHex = (int) Math.pow(9, lLevel_From_config - hLevel_From_config); 
    for (String hex : hexList) { 
     for (int i = 0; i < numHex; i++) { 
      context.write(m_mapKey, generateHexagon(hex, i)); 
     } 
    } 

java.lang.OutOfMemoryError: GC overhead limit exceeded 
    at java.util.HashMap.createEntry(HashMap.java:897) 
    at java.util.HashMap.addEntry(HashMap.java:884) 
    at java.util.HashMap.put(HashMap.java:505) 
    at java.util.HashSet.add(HashSet.java:217) 
    at com.pb.hadoop.spark.hexgen.function.HexGenMapFunction.call(HexGenMapFunction.java:56) 
    at com.pb.hadoop.spark.hexgen.function.HexGenMapFunction.call(HexGenMapFunction.java:21) 
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) 
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1197) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1205) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

スパーク実装:

@Override 
public Iterable<Tuple3<String, Integer, Integer>> call(Tuple2<Text, IntWritable> tuple) throws Exception { 
String key = tuple._1.toString(); 
String[] hexList = key.split(","); 
int numHex = tuple._2.get(); 
Set<Tuple3<String, Integer, Integer>> hegagons = new HashSet<>(); 
    for (String hex : hexList) { 
     for (int i = 0; i < numHex; i++) { 
      hegagons.add(generateHexagon(hex, i)); //this is line 56 
     } 
    } 
    return hegagons; 
+0

完全なエラーメッセージを投稿できますか? – Bhavesh

+0

こんにちはBhavesh、ここでスタックトレースがある... – Ajeet

+0

あなたもspark.driver.extraJavaOptionsを追加 – Bhavesh

答えて

0

私はスパークでそれを実装するための回避策を見つけました。はい、私には回避策のように見えます。なぜなら、これを実装するのが最良のアプローチではないと感じているからです。

ステップ1:所定のパーティション数でRDDを作成します。

List<Integer> ids = new ArrayList<Integer>(); 
for(int i=0; i< numOfPartitions; i++){ 
    ids.add(i); 
} 

JavaRDD<Integer> seqRDD = javaSparkContext.parallelize(ids, numOfPartitions); 

ステップ2

int rowsPerPartition = numHex/numOfPartitions + 1; 

ステップ3:今すぐnumOfHex RDDSを作成するには、これらのRDDSを費やすパーティションごとの行を取得します。 PartitionMapFunctionの "OutOfMemory"を避けるために、別のパラメータmaxRowsPerPartitionを使用しています。

JavaRDD<Integer> numRDD = null; 
if(rowsPerPartition > maxRowsPerPartition){ 
    numRDD = seqRDD.mapPartitions(new PartitionMapFunction(maxRowsPerPartition));    
    rowsPerPartition = rowsPerPartition/maxRowsPerPartition + 1; 
    numRDD = numRDD.flatMap(new PartitionFlatMapFunction(rowsPerPartition, numHex)); 
} else { 
    numRDD = hexIdRDD.mapPartitions(new PartitionMapFunction(rowsPerPartition)); 
}   

ステップ4:RDDを六角形の整数RDDをtranasformするマップ変換と各RDDにgenerateHexagon(16進数、hexId)を適用します。

String hex = HexgenUtilities.getHexgenKey(sparkConfig); 
Broadcast<String> broadcastedHex = javaSparkContext.broadcast(hex); 
JavaRDD<Hexagon> hexStringRDD = numRDD.flatMap(new HexGenMapFunction(broadcastedHex, lLevel)); 

これはMapReduceの実装を比較して私にとってはうまくいきます。どんな提案やより良いアプローチも可能です。 ありがとう