2016-01-11 9 views
15

に設定されている私は、スパーク1.6を使用して、私は次のコードを実行すると上記の問題に遭遇しています:スパーク1.6:java.lang.IllegalArgumentExceptionが:spark.sql.execution.idはすでに

// Imports 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.SaveMode 
import scala.concurrent.ExecutionContext.Implicits.global 
import java.util.Properties 
import scala.concurrent.Future 

// Set up spark on local with 2 threads 
val conf = new SparkConf().setMaster("local[2]").setAppName("app") 
val sc = new SparkContext(conf) 
val sqlCtx = new HiveContext(sc) 

// Create fake dataframe 
import sqlCtx.implicits._ 
var df = sc.parallelize(1 to 50000).map { i => (i, i, i, i, i, i, i) }.toDF("a", "b", "c", "d", "e", "f", "g").repartition(2) 
// Write it as a parquet file 
df.write.parquet("/tmp/parquet1") 
df = sqlCtx.read.parquet("/tmp/parquet1") 

// JDBC connection 
val url = s"jdbc:postgresql://localhost:5432/tempdb" 
val prop = new Properties() 
prop.setProperty("user", "admin") 
prop.setProperty("password", "") 

// 4 futures - at least one of them has been consistently failing for 
val x1 = Future { df.write.jdbc(url, "temp1", prop) } 
val x2 = Future { df.write.jdbc(url, "temp2", prop) } 
val x3 = Future { df.write.jdbc(url, "temp3", prop) } 
val x4 = Future { df.write.jdbc(url, "temp4", prop) } 

がここにありますgithubのの要旨:https://gist.github.com/karanveerm/27d852bf311e39f05491

私が手にエラーがある:/回避策

org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1482) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:247) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:306) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0] 
     at writer.SQLWriter$.writeDf(Writer.scala:75) ~[temple.temple-1.0-sans-externalized.jar:na] 
     at writer.Writer$.writeDf(Writer.scala:33) ~[temple.temple-1.0-sans-externalized.jar:na] 
     at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:460) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] 
     at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:452) ~[temple.temple-1.0-sans-externalized.jar:2.4.6] 
     at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.7.jar:na] 

では、これはスパークのバグですか、私が何か間違ったことをやっていますか?

+0

このコードを実行したマシンは何ですか?私は特にCPU(コア数)に興味がありますか? –

+0

OSXエルキャピタン10.11.1 | MacBook Air(13インチ、2014年初め)| 1.7 GHz Intel Core i7 | 8GB 1600 MHz DDR3(私はi7が4コアだと信じています) – sparknoob

+0

興味深いことに、これはsparkシェルのような同様の設定では再現できません。これはいくつかの厄介なバグかもしれません。以前はID生成に問題がありました。そのためにJIRAを作成することもできます。 –

答えて

0

テスト1:df.write操作を並行して行うのではなく、順次実行すると役立ちますか?

テスト2:データフレームを永続化してから、すべてのdf.write操作を並行して実行し、すべてが完了した後にペナルティ化して、これが役立つかどうかを確認すると役立ちますか?

1

いくつかのことを試した後、私はグローバルForkJoinPoolによって作成されたスレッドのいずれかがランダムな値にそのspark.sql.execution.idプロパティセットを取得することがわかりました。 私は実際にそのプロセスを特定できませんでしたが、自分でExecutionContextを使用して回避することができました。

import java.util.concurrent.Executors 
import concurrent.ExecutionContext 
val executorService = Executors.newFixedThreadPool(4) 
implicit val ec = ExecutionContext.fromExecutorService(executorService) 

コードをhttp://danielwestheide.com/blog/2013/01/16/the-neophytes-guide-to-scala-part-9-promises-and-futures-in-practice.htmlから使用しました。 ForkJoinPoolクローンは新しいものを作成するときに属性をスレッド化することがあります.SQL実行のコンテキストで発生すると、null以外の値が返されますがFixedThreadPoolはインスタンス化時にスレッドを作成します。

+0

私は同じ問題を抱えています。しかし、この解決策は役に立たないようです。まだ 'spark.sql.execution.id already set'エラーが表示されます。 – mottosan

+0

@Knshiro、Executors.newFixedThreadPool(1)であってはいけませんか? – smas

+0

@smas問題はスレッドの数ではなく、それらのスレッドの初期化にあります。フォーク結合プールは、ランダムな時間にスレッドを初期化し、すべての属性をクローンする新しいスレッドを初期化します。したがって、新しいスレッドの初期化時に、既存のスレッドにSQL実行IDが設定されている場合、新しいスレッドを生成させる代わりに新しいスレッドにコピーします。 – Knshiro

1

、チェックSPARK-13747

は、該当する場合は、ご使​​用の環境にスパークバージョン2.2.0以降を使用することを考えてみてください。

関連する問題