2016-06-15 8 views
1

私のSparkクラスタには1人のマスターと2人のワーカーがいます。アプリケーションはc3vファイルをs3からDataFramesに読み込み、テンポラリテーブルとして登録し、sqlContextを使用してsqlクエリを実行して新しいDataFramesを作成します。その後、これらのDFはMySql DBに保存されます。これらのジョブはすべて複数のノードで実行されています。1つのエグゼキュータのみを使用してスパーク・ジョブの1つだけを実行するのはなぜですか?

しかし、これらのテーブルをDBからDataFramesに読み込んで一時テーブルとして登録し、sqlContextクエリを実行すると、すべての処理が1つのノードでのみ行われます。これを引き起こす原因は何ですか?ここで

は、私のコードの例です:

DataFrame a = sqlContext.read().format("com.databricks.spark.csv").options(options) 
       .load("s3://s3bucket/a/part*"); 
DataFrame b = sqlContext.read().format("com.databricks.spark.csv").options(options) 
       .load("s3://s3bucket/b/part*"); 

a.registerTempTable("a"); 
b.registerTempTable("b"); 

DataFrame c = sqlContext.sql("SELECT a.name, b.name from a join b on a.id = b.a_id"); 

c.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "c", prop); 

// other jobs are similar 

Map<String, String> dOptions = new HashMap<String, String>(); 
dOptions.put("driver", MYSQL_DRIVER); 
dOptions.put("url", MYSQL_CONNECTION_URL); 

dOptions.put("dbtable", "(select * from c) AS c"); 
rC= sqlContext.read().format("jdbc").options(dOptions).load(); 
rC.cache(); 

dOptions.put("dbtable", "(select * from d) AS d"); 
rD= sqlContext.read().format("jdbc").options(dOptions).load(); 
rD.cache(); 

dOptions.put("dbtable", "(select * from f) AS f"); 
rF= sqlContext.read().format("jdbc").options(dOptions).load(); 
rF.cache(); 

rC.registerTempTable("rC"); 
rD.registerTempTable("rD"); 
rF.registerTempTable("rF"); 

DataFrame result = sqlContext.sql("SELECT rC.name, rD.name, rF.date from rC join rD on rC.name = rD.name join rF on rC.date = rF.date"); 

result.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "result_table", prop); 
+1

あなたの仕事は何を提出していますか? 「1つのノード」を意味する場合、マスターUIで1人のワーカーしか見ることができないということですか? –

+0

@Hawknightジョブを送信するためにspark-submitを使用しています。完全なコマンドは次のとおりです。 "spark-submit --class MyClass --deploy-mode cluster s3://bucket/file.jar"私はGangliaとSpark UIを経由して監視しています。どちらも労働者を認識し、私はいくつかの仕事がパラレルで実行されることがわかります。しかし、私が上記に投稿した仕事の中でパラレル化は中断し、特定のタスク(ステージ内)は1つのワーカーノードによってのみ実行されます。ここにそれを示すGanglia UIのスクリーンショットがあります。 http://pokit.org/get/img/2a5bcd853b97aad2bc9e86a90c9b2733.png – KikiRiki

+0

コードだけに基づいて、なぜ特定の段階で1人の労働者しか働かないのかを知ることは非常に難しいです。どの段階で問題を正確に突き止めることができますか?問題はどのような課題ですか? –

答えて

0

あなたは私たちとあなたのSparkConf()オブジェクトを共有してもらえますか?

SparkConf()オブジェクトには、Sparkアプリケーションの構成が含まれています。

-heapメモリエグゼキュータコアの-theマスター

個エグゼキュータの

個が

を割り当て

:次のようなキーと値のペアのような種々のスパークパラメータを設定するために使用され

-other ..

+1

ここに私のSparkConfオブジェクト: SparkConf conf = new sparkConf()。setAppName( "org.spark.SchemaTransformer")。setMaster( "糸クラスター"); 実行環境: spark.master = yarn executor.cores = 4 executor.memory = 5120M – KikiRiki

+1

私はいくつかの議論を欠場すると思います。以下の方法を追加してみてください。 (spark.submit.deployMode "、" cluster ") SparkConf()。(" master "、" yarn ")\ .set(" spark.submit.deployMode "、" cluster ") .set" spark.executor.instances "、" 8 ( "spark.executor.memory"、 "5120M")\ .set( "spark.executor.cores"、 "4" ( "spark.yarn.driver.memoryOverhead"、 "10000M")\ .set( "spark.yarn.memoryOverhead"、 "10000M")\ .set – theudbald

関連する問題