1

リモートmongodbコレクションにspark sqlデータフレームを挿入しようとしています。 以前はMongoClientを使ってJavaプログラムを書いて、リモートコレクションがアクセス可能かどうかを確認しました。Spark Dataframe to MongoDBドキュメント挿入問題

マイ本スパークコードは以下の通りである -

scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
warning: there was one deprecation warning; re-run with -deprecation for details 
sqlContext: org.apache.spark.sql.hive.HiveContext = [email protected] 
scala> val depts = sqlContext.sql("select * from test.user_details") 
depts: org.apache.spark.sql.DataFrame = [user_id: string, profile_name: string ... 7 more fields] 
scala> depts.write.options(scala.collection.Map("uri" -> "mongodb://<username>:<pwd>@<hostname>:27017/<dbname>.<collection>")).mode(SaveMode.Overwrite).format("com.mongodb.spark.sql").save() 

THSは、次のエラーを与えている -

java.lang.AbstractMethodError: com.mongodb.spark.sql.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation; 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) 
    ... 84 elided 

Iはまた、以下にエラースローされ、次の試み:

scala> depts.write.options(scala.collection.Map("uri" -> "mongodb://<username>:<pwd>@<host>:27017/<database>.<collection>")).mode(SaveMode.Overwrite).save() 
java.lang.IllegalArgumentException: 'path' is not specified 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$17.apply(DataSource.scala:438) 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$17.apply(DataSource.scala:438) 
    at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) 
    at org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.getOrElse(ddl.scala:117) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:437) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) 
    ... 58 elided 

私は以下のパッケージをインポートしました -

import org.apache.spark.{SparkConf, SparkContext} 

import org.apache.spark.sql.SQLContext 

import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern} 

import com.mongodb.spark.config._ 

import org.apache.spark.sql.hive.HiveContext 

import org.apache.spark.sql._ 

depts.show()は予想通りに動作しています。データフレームは正常に作成されます。

私にこれに関するアドバイスや提案をお願いします。あなたは以下のようなデータフレームSQLを保存することができ、あなたがMongoDB Spark Connector v1.0のを使用していると仮定すると おかげ

答えて

1

:詳細については

// DataFrames SQL example 
df.registerTempTable("temporary") 
val depts = sqlContext.sql("select * from test.user_details") 
depts.show() 
// Save out the filtered DataFrame result 
MongoSpark.save(depts.write.option("uri", "mongodb://hostname:27017/database.collection").mode("overwrite")) 

はMongoDBのの簡単なデモについてMongoDB Spark Connector: Spark SQL

を参照し、ドッキングウィンドウを使用してスパークMongoDB Spark Docker: examples.scala - dataframes

関連する問題