2017-08-24 8 views
0

私は9ノードm3.xlarge(8 cpu/15ギガ)のEMRクラスターを持っています。ここで1ノードはマスターで、他の8つはスレーブです。 GraphX接続コンポーネントをチェックする簡単なプログラムを実行しようとしています。私が使用してEMRクラスタ上でjarファイルを提出スパーク - 完了に時間がかかるSimple GraphXプログラム

def main(args : Array[String]): Unit = { 

    val sparkConfig = new SparkConf() 
     .set("hive.exec.dynamic.partition", "true") 
     .set("hive.exec.dynamic.partition.mode", "nonstrict") 
     .set("hive.s3.max-client-retries", "50") 
     .set("hive.s3.max-error-retries", "50") 
     .set("hive.s3.max-connections", "100") 
     .set("hive.s3.connect-timeout", "5m") 
     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     .set("spark.kryo.registrationRequired", "true") 
     .set("spark.kryo.classesToRegister", "org.apache.spark.graphx.impl.VertexAttributeBlock") 
     .set("spark.broadcast.compress", "true") 
     .set("spark.default.parallelism", "24") 

    val spark = SparkSession.builder() 
     .appName("Spark Hive Example") 
     .enableHiveSupport() 
     .config(sparkConfig) 
     .getOrCreate() 

    // Set Kryo for serializing 
    GraphXUtils.registerKryoClasses(sparkConfig) 
    val res = spark.sql("SELECT col1, col2, col3 FROM table1 limit 10000") 
    val edgesRDD = res.rdd.map(row => Edge(row.getString(0).hashCode, row.getString(1).hashCode, row(2).asInstanceOf[String])) 

    val res_two = spark.sql("SELECT col1 FROM table2 where col1 is not NULL and col1 != '' limit 100000") 
    val vertexRDD: RDD[(VertexId, String)] = res_two.rdd.map(row => (row.getString(0).hashCode, row(0).asInstanceOf[String])) 

    val graph = Graph(vertexRDD, edgesRDD) 
    graph.cache() 

    val connectedComponents = graph.connectedComponents().vertices 

:これは私のコードでTABLE1とtable2の両方

spark-submit --conf spark.hadoop.fs.s3a.access.key=xxx --conf spark.hadoop.fs.s3a.secret.key=xxx --conf spark.yarn.submit.waitAppCompletion=false --class com.mypkg.SampleGraphX --master yarn --deploy-mode cluster --num-executors 12 --executor-cores 6 --executor-memory 10g --conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" --conf "spark.executor.extraJavaOptions=Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" GraphxTest1.jar 

を数百万のエントリを持っていますが、私は10000と100000読むために私のコードが限られていますそれらからのエントリ。どちらもS3にある外部テーブルです。この仕事は、ほぼ2日間実行されています。どうしてそんなに長い時間をかけているのですか?私のコードに何か問題がありますか?あるいは、いくつかの設定を変更する必要がありますか?

また、UIを見ると、クラスタに64個のvCore(ノードあたり8コア)があっても、各ノードは1コアしか使用していないことがわかります。 以下の画像では、合計64のうち3つのコアのみが使用されています。私はこれが最大6つのコア(6つのノードがアクティブだったとき)に行くのを見ました。なぜそれはすべてのコアを使用していないのですか?

私はSparkとGraphXの両方に新しいので、私が間違っていることはわかりません。 enter image description here

答えて

0

あなたはSpark UIを見ていません。これはスケジューラのUIです。あなたのアプリケーションはリストにあり、アプリケーションにリンクされているURLにアクセスする必要があります。 Spark UIにアクセスすると、何が起きているのかを簡単に伝えることができます(ストレージに十分なメモリがない、パーティションが足りない、データを読み込めない、繰り返しでスタックした...)

+0

これは、 '17/08/22 23:17:48 WARN YarnSchedulerBackend $ YarnSchedulerEndpoint:失敗したとマークされたコンテナ:container_1502391082530_0027_01_000007 on host:....終了ステータス:1.診断:コンテナの起動からの例外。 コンテナID:container_1502391082530_0027_01_000007 終了コード:1 ...コンテナは、非ゼロの終了コード1 'で終了しました。それはいくつかの設定の問題ですか?あまりにも多くのメモリを使用していますか? – drunkenfist

関連する問題