私は9ノードm3.xlarge(8 cpu/15ギガ)のEMRクラスターを持っています。ここで1ノードはマスターで、他の8つはスレーブです。 GraphX接続コンポーネントをチェックする簡単なプログラムを実行しようとしています。私が使用してEMRクラスタ上でjarファイルを提出スパーク - 完了に時間がかかるSimple GraphXプログラム
def main(args : Array[String]): Unit = {
val sparkConfig = new SparkConf()
.set("hive.exec.dynamic.partition", "true")
.set("hive.exec.dynamic.partition.mode", "nonstrict")
.set("hive.s3.max-client-retries", "50")
.set("hive.s3.max-error-retries", "50")
.set("hive.s3.max-connections", "100")
.set("hive.s3.connect-timeout", "5m")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.kryo.classesToRegister", "org.apache.spark.graphx.impl.VertexAttributeBlock")
.set("spark.broadcast.compress", "true")
.set("spark.default.parallelism", "24")
val spark = SparkSession.builder()
.appName("Spark Hive Example")
.enableHiveSupport()
.config(sparkConfig)
.getOrCreate()
// Set Kryo for serializing
GraphXUtils.registerKryoClasses(sparkConfig)
val res = spark.sql("SELECT col1, col2, col3 FROM table1 limit 10000")
val edgesRDD = res.rdd.map(row => Edge(row.getString(0).hashCode, row.getString(1).hashCode, row(2).asInstanceOf[String]))
val res_two = spark.sql("SELECT col1 FROM table2 where col1 is not NULL and col1 != '' limit 100000")
val vertexRDD: RDD[(VertexId, String)] = res_two.rdd.map(row => (row.getString(0).hashCode, row(0).asInstanceOf[String]))
val graph = Graph(vertexRDD, edgesRDD)
graph.cache()
val connectedComponents = graph.connectedComponents().vertices
:これは私のコードでTABLE1とtable2の両方
spark-submit --conf spark.hadoop.fs.s3a.access.key=xxx --conf spark.hadoop.fs.s3a.secret.key=xxx --conf spark.yarn.submit.waitAppCompletion=false --class com.mypkg.SampleGraphX --master yarn --deploy-mode cluster --num-executors 12 --executor-cores 6 --executor-memory 10g --conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" --conf "spark.executor.extraJavaOptions=Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" GraphxTest1.jar
を数百万のエントリを持っていますが、私は10000と100000読むために私のコードが限られていますそれらからのエントリ。どちらもS3にある外部テーブルです。この仕事は、ほぼ2日間実行されています。どうしてそんなに長い時間をかけているのですか?私のコードに何か問題がありますか?あるいは、いくつかの設定を変更する必要がありますか?
また、UIを見ると、クラスタに64個のvCore(ノードあたり8コア)があっても、各ノードは1コアしか使用していないことがわかります。 以下の画像では、合計64のうち3つのコアのみが使用されています。私はこれが最大6つのコア(6つのノードがアクティブだったとき)に行くのを見ました。なぜそれはすべてのコアを使用していないのですか?
私はSparkとGraphXの両方に新しいので、私が間違っていることはわかりません。
これは、 '17/08/22 23:17:48 WARN YarnSchedulerBackend $ YarnSchedulerEndpoint:失敗したとマークされたコンテナ:container_1502391082530_0027_01_000007 on host:....終了ステータス:1.診断:コンテナの起動からの例外。 コンテナID:container_1502391082530_0027_01_000007 終了コード:1 ...コンテナは、非ゼロの終了コード1 'で終了しました。それはいくつかの設定の問題ですか?あまりにも多くのメモリを使用していますか? – drunkenfist