2017-10-18 3 views
2

したがって、ID列を持つSql Serverにテーブルがあり、ID列です。私が直面している問題は、データフレームをデータフレームにプッシュしようとすると、identity_insertが 'off'に設定されているという不満があることです。今、私は明示的にjdbcを使って 'on'に設定していますが、これはSQL Server側のセッション変数であるため、dataframe pushコマンドがヒットした時点で 'off'に戻ります。サーバ。set_identityがオフの場合、SparkデータフレームをSQL Serverテーブルにプッシュする方法はありますか?

「オン」にしてデータフレームを同じセッションでプッシュする方法はありますか?

いくつかのコード - SQL Serverのテーブル

create table dbo.testtable 
(
[Id] int identity, 
[Name] varchar(100), 
[Address] varchar(100), 
[ExtraColumn] int, 
[Age] int 
) 

マイデータフレーム - 私は、データフレームからId列を削除する場合は、上記のすべて正常に動作することを

case class TestClass(Id: Int, Name: String, Address: String, ExtraColumn: 
Int, Age: Int) 

val seqClass = Seq(TestClass(1, "kv", "riata", 2, 30), 
       TestClass(2, "xyz", "xyz's place", 2, 31), 
       TestClass(3, "abc", "abc's place", 2, 32)) 

val sparkSession = createSparkSession //creating through some method 
val df = sparkSession.sqlContext.createDataFrame(seqClass) 
JDBCUtils.setIdentityInsertOn(conn, JDBC.SQL_SERVER.TYPE, 
"testdb1.dbo.testtable", None) //my method to turn on identity_insert 

//code to push data frame to sql server 
df.coalesce(1).write.mode("append").jdbc(jdbcUrl,"testdb1.dbo.testtable", 
getConnectionProperties(username,password, dbType)) 

//getConnectionProperties is my own method that provides connection 
//properties for jdbc. 

注意。コード全体が機能するので、データフレーム上でIdを維持し、テストテーブルにプッシュできる必要があります。なぜ私は単にテストテーブルからID生成を使用できないのですか?上記のコードは非常に複雑なワークフローの一部なので、データフレームに上記のようなID列を生成する必要があります。

ご協力いただきましてありがとうございます。

おかげで、すでにソリューションを実装していたバイバブオフラインに出て到達した後、この問題を解決することが

答えて

1

。私は他の人が将来使用するために同じものをここに掲載しています。

、場所に以下でSaveTable(のローカルコピーを作成)とJDBCUtils.javaから従属関数 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

コードの行の下に挿入する更新SavePartition機能 - ライン640前

if(Identity_Insert_Off) { 
    val sql = "set IDENTITY_INSERT " + table + " ON"; 
    val statement = conn.createStatement() 
    statement.execute(sql) 
} 

whileループ。

while (iterator.hasNext) {...} 

更新クエリ以下

はアイデンティティを確認するために使用することができます(私は関数に渡されたので、唯一のアイデンティティを挿入するフラグをチェックするだけのSQLServerのために、このコードを使用しています)シナリオに基づく条件場合はOnですまたは特定のテーブルのオフ -

SELECT OBJECTPROPERTY(OBJECT_ID('<TableName>'), 'TableHasIdentity'); 
関連する問題