2016-08-24 11 views
1

私は4つのノードhadoopクラスタ(mapr)にそれぞれ40GBのメモリを持っています。大きなデータセット(500million行)のフィールドの1つに関数を '適用'する必要があります。私のコードの流れは、私がスパークデータフレームとしてハイブテーブルからデータを読み込み、次のようにいずれかの列に所望の機能を適用することで、以下のように見えるかもしれませんマップ変換のパフォーマンスspark dataframeとRDD

schema = StructType([StructField("field1", IntegerType(), False), StructField("field2", StringType(), False),StructField("field3", FloatType(), False)]) 
udfCos = udf(lambda row: function_call(row), schema) 
result = SparkDataFrame.withColumn("temp", udfCos(stringArgument)) 

同様のRDDバージョン:

result = sparkRDD.map(lambda row: function_call(row)) 

このコードのパフォーマンスを向上させたいのですが、コードが最大並列性とスループットが低下しないようにしています。SparkConfの 'repartition'の並列性の値私の問題の文脈では、他のアプローチがあります。どんな助けもありがとうございます。あなたは、クラスタがどのように多くのリソースを利用中であるかどうかあなたがあなたのアプリケーションを監視する必要があるいくつかのこと

1)を知っておく必要がありますアプリケーションをチューニングするための

MASTER="yarn-client" /opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 10 --driver-cores 10 --driver-memory 30g --executor-memory 7g --executor-cores 5 --conf spark.driver.maxResultSize="0" --conf spark.default.parallelism="150" 
+0

を参照することができます例いくつかあります。 – zero323

+0

その提案の背後にある具体的な理由はありますか?私はRDDマップとudfの実行時間を同じにしたサンプルテストを行った(デフォルトで) – Mike

+2

一般的に、この往復JVM - > Python - > JVMは高価で比較的遅く、いくつかの醜いプロパティを持っています(特にSpark <2 )ので、UDFよりもネイティブ(JVM)関数の構成を優先する必要があります。 – zero323

答えて

0

私のスパーク起動パラメータ作成したアプリケーションで使用されます

さまざまなツールを使用して監視できます。 Ganglia GangliaからCPU、メモリ、ネットワークの使用状況を確認できます。

2)CPUとメモリの使用状況に関する観測に基づく、あなたのアプリケーションのための火花defaults.confに

であなた

フォームスパークポイントを必要とするチューニングのどのような良いアイデアを得ることができます

ガベージコレクションアルゴリズムを変更することもできますが、アプリケーションによって必要とされるドライバメモリとエグゼキュータメモリの量はどのようなシリアル化が必要かを指定できます。

以下は、チューニング要件の詳細は、

spark.serializer     org.apache.spark.serializer.KryoSerializer 
spark.executor.extraJavaOptions -XX:MaxPermSize=2G -XX:+UseG1GC 
spark.driver.extraJavaOptions -XX:MaxPermSize=6G -XX:+UseG1GC 

に基づいて、このパラメータは、Python UDFを使用していない手始めにhttp://spark.apache.org/docs/latest/tuning.html

関連する問題