2015-10-02 18 views
8

Spark 1.4.0を使用して、insertIntoJdbc()を使用して、Spark DataFrameからMemSQLデータベース(正確にはMySQLデータベースとやりとりするようなもの)にデータを挿入しようとしています。しかし、私はRuntime TableAlreadyExists例外を取得し続けます。Spark DataFrame InsertIntoJDBC - TableAlreadyExists例外

まず私はこのようなMemSQLテーブルを作成します。

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT); 

その後、私はスパークでの簡単なデータフレームを作成し、このようMemSQLに挿入しよう:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val") 
//df: org.apache.spark.sql.DataFrame = [val: int] 

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false) 

java.lang.RuntimeException: Table table1 already exists. 

答えて

6

@wayneによって答えは、特に、おそらくmemSQLのためのよりよい解決策であるが、この解決策は、一般的なJDBC接続に適用されます。

insertIntoJdbcは1.4.0以降で廃止されているようですが、実際にはwrite.jdbc()を呼び出します。

write()は、DataFrameWriterオブジェクトを返します。テーブルにデータを追加する場合は、オブジェクトの保存モードを"append"に変更する必要があります。

上記の例のもう1つの問題は、DataFrameスキーマがターゲットテーブルのスキーマと一致しないことです。

以下のコードは、Sparkシェルの実例です。 spark-shellセッションを開始するのにspark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jarを使用しています。

import java.util.Properties 

val prop = new Properties() 
prop.put("user", "root") 
prop.put("password", "") 

val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val") 
val dfWriter = df.write.mode("append") 

dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop) 
+2

こんにちは、私はsparkを使用しています1.5と私はまだテーブルを取得していますが、write.mode( "append")と言った後でも例外が存在していますか?これについてコメントしますか?データベースに 'customer_spark'という名前のオブジェクトが既にあります –

+0

ちょっと@DJElbow、 "テーブル 'table1'が既に存在する"例外を取得します。いつwrite.mode(SaveMode.Append)。私はチェックして、 'root'ユーザーを使用しているときはうまくいっていますが、CREATE/INSERT/UPDATE権限を持つユーザーを使用しているときにこのエラーが発生しています。 – marnun

3

insertIntoJDBCのドキュメントが実際に間違っています;彼らはテーブルが既に存在しなければならないことを言うが、それがない場合は、上記を参照できるよう、実際に、それは、エラーがスローされます:

https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264

私たちは、あなたが見つけることができる私たちのMemSQLスパークコネクタを、使用することをお勧めしますここに:

https://github.com/memsql/memsql-spark-connector

あなたのコードでは、そのライブラリとインポートcom.memsql.spark.connector._が含まれている場合、あなたはMemSQLにあなたのデータフレームを保存するためにdf.saveToMemSQL(...)を使用することができます。あなたはここに私達のコネクタのドキュメントを見つけることができます。

http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions

+0

非常に良い。それは物事を単純化する。どこかでダウンロードできるコンパイル済みのjarファイルはありますか?見つけにくい。 – DJElbow

+1

maven.memsql.comをリゾルバとして追加する場合は、プロジェクトに依存関係として含めることができます。 https://github.com/memsql/memsql-spark-connector#using –

1

私は同じ問題がありました。 1.6.2へのスパークバージョンの更新はうまく機能しました