2016-12-05 18 views
2

Spark Datasetを既存のpostgresqlテーブルに書き込もうとしています(カラムタイプのようなテーブルのメタデータを変更できません)。この表の列の1つはタイプHStoreであり、問​​題を引き起こしています。Spark Datasetを使用してPostgreSQL hstoreに書き込む方法

私は(エスケープが空の文字列を与えたときに、ここで元のマップがどの空である)の書き込みを起動したとき、私は次の例外を参照してください。

Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted. Call getNextException to see the cause. 
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136) 
    at org.postgresql.core.v3.QueryExecutorImpl$1.handleError(QueryExecutorImpl.java:419) 
    at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308) 
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004) 
    at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187) 
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212) 
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351) 
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:300) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:299) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying 

これは私がそれをやっている方法です:

def escapePgHstore[A, B](hmap: Map[A, B]) = { 
    hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",") 
} 
... 
val props = new Properties() 
props.put("user", "xxxxxxx") 
props.put("password", "xxxxxxx") 

ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column")) 
    .drop("original_column") 
    .coalesce(1).write 
    .mode(org.apache.spark.sql.SaveMode.Append) 
    .option("driver", "org.postgresql.Driver") 
    .jdbc(jdbcUrl, hashedTablePartName, props) 

私はescapePgHstoreを使用して文字列に地図[文字列、ロング]からoriginal_columnをエスケープしていない場合、私は次のエラーを参照してください。

java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint> 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:293) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:292) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292) 
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441) 
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 
    at scala.App$class.main(App.scala:76) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

sparkに有効なhstoreデータ型を書き込む正しい方法は何ですか?

答えて

2

私はちょうどポストグルが私の列の適切なタイプを推測しようとしていることが判明しました。 official documentationに記載されているように、接続文字列にstringtypeunspecifiedを設定します。

props.put("stringtype", "unspecified") 

これで完全に動作します。

+1

これは私のために素晴らしい仕事!あなたは私に****の時間を節約しました、そして、これは私がこのトピックで見つけることができた唯一の情報でした。つまり、私はもう一つ重要な部分を見つけました:あなたが書いている 'hstore'列は、すでに存在していなければなりません。 Sparkが使用している 'SaveMode'が' 'overwrite ''に設定されていると、Postgresはテキストを' 'hstore''カラムに解析しようとする機会を得ることはありません。 SparkはPostgresに 'text'カラムを伝えるだけです。 – mtrewartha

0

これは、HSTORE JSON列とJSONB列を持つPostgresテーブルにデータフレームを書き込むためのpysparkコードです。したがって、Spark Dataframeで作成できないPostgresで作成された複雑なデータ型の場合は、オプションまたはデータフレームからSQLへの書き込み関数に設定するプロパティでstringtype="unspecified"を指定する必要があります。以下は

write()機能を使用してPostgreSQLのテーブルにスパークデータフレームを書き込む例です:

dataframe.write.format('jdbc').options(driver=driver,user=username,password=password, url=target_database_url,dbtable=table, stringtype="unspecified").mode("append").save() 
関連する問題