MongoDB Connector for HashopをSparkと共に使用して、MongoDBの1つのコレクションを照会し、取り出されたすべてのドキュメントを別のコレクションにアップアップしようとしています。 MongoUpdateWritableクラスは、MongoDBのコレクションを更新するRDDの値に使用され、upsertフラグを持ちます。残念ながら、upsertフラグは実行に影響しないようです。あたかもupsertフラグがfalseに設定されているかのように、コードはエラーなしで実行されています。mongo-hadoopパッケージupsertとsparkが動作していないようです
このコードは、ローカルホストのmongodプロセスに接続し、mongoクライアントを使用してデータを書き込み、そのデータを読み込み、sparkを使用して同じデータベース内の別のコレクションに書き込もうとします。書き込みが完了しないと、コードは同じIDを持つmongoクライアントを介してターゲットテーブルに文書を書き込み、同じsparkジョブを実行してupsertの更新部分が正しく動作することを示します。
スパークバージョン:1.6.0-cdh5.7.0
Hadoopのバージョン:2.6.0-cdh5.4.7
Mongoのバージョン:3.2.0
Mongoの-Hadoopのコア・バージョン:2.0 .2
import com.mongodb.client.{FindIterable, MongoCollection, MongoDatabase}
import com.mongodb.{BasicDBObject, DBCollection, MongoClient}
import com.mongodb.hadoop.io.MongoUpdateWritable
import org.apache.hadoop.conf.Configuration
import org.bson.{BSONObject, BasicBSONObject, Document}
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object sparkTest extends App {
//setting up mongo
val mongo: MongoDatabase = new MongoClient("localhost",27017).getDatabase("test")
var source: MongoCollection[Document] = mongo.getCollection("source")
val target: MongoCollection[Document] = mongo.getCollection("target")
source.drop()
target.drop()
//inserting document
val sourceDoc = new Document()
sourceDoc.put("unchanged","this field should not be changed")
sourceDoc.put("_id","1")
source.insertOne(sourceDoc)
//setting up spark
val conf = new SparkConf().setAppName("test mongo with spark").setMaster("local")
val mongoConfig = new Configuration()
val sc = new SparkContext(conf)
mongoConfig.set("mongo.input.uri",
"mongodb://localhost:27017/test.source")
mongoConfig.set("mongo.output.uri",
"mongodb://localhost:27017/test.target")
//setting up read
val documents = sc.newAPIHadoopRDD(
mongoConfig, // Configuration
classOf[MongoInputFormat], // InputFormat
classOf[Object], // Key type
classOf[BSONObject]) // Value type
//building updates with no document matching the query in the target collection
val upsert_insert_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
(value: BSONObject) => {
val query = new BasicBSONObject
query.append("_id", value.get("_id").toString)
val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
update.append("added","this data will be added")
println("val:"+value.toString)
println("query:"+query.toString)
println("update:"+update.toString)
new MongoUpdateWritable(
query, // Query
update, // Update
true, // Upsert flag
false, // Update multiple documents flag
true // Replace flag
)}
)
//saving updates
upsert_insert_rdd.saveAsNewAPIHadoopFile(
"",
classOf[Object],
classOf[MongoUpdateWritable],
classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
mongoConfig)
// At this point, there should be a new document in the target database, but there is not.
val count = target.count()
println("count after insert: "+count+", expected: 1")
//adding doc to display working update. This code will throw an exception if there is a
//document with a matching _id field in the collection, so if this breaks that means the upsert worked!
val targetDoc = new Document()
targetDoc.put("overwritten","this field should not be changed")
targetDoc.put("_id","1")
target.insertOne(targetDoc)
//building updates when a document matching the query exists in the target collection
val upsert_update_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
(value: BSONObject) => {
val query = new BasicBSONObject
query.append("_id", value.get("_id").toString)
val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
update.append("added","this data will be added")
println("val:"+value.toString)
println("query:"+query.toString)
println("update:"+update.toString)
new MongoUpdateWritable(
query, // Query
update, // Update
true, // Upsert flag
false, // Update multiple documents flag
true // Replace flag
)}
)
//saving updates
upsert_update_rdd.saveAsNewAPIHadoopFile(
"",
classOf[Object],
classOf[MongoUpdateWritable],
classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
mongoConfig)
//checking that the update succeeded. should print:
//contains new field:true, contains overwritten field:false
val ret = target.find().first
if (ret != null)
println("contains new field:"+ret.containsKey("added")+", contains overwritten field:"+ret.containsKey("overwritten"))
else
println("no documents found in target")
}
私が紛失しているものについての洞察は役に立ちます。出力形式をMongoUpdateWritableに変更しようとしましたが、動作に影響はありませんでした。これはおそらく設定上の問題ですが、入力と出力の両方の形式とMongoUpdateWritableクラスを使って文書を書いても、文書を読み書きすることができたので、mongo hadoopアダプタのバグのようです。便宜上
POM:既存の文書を交換し、アップサートます_idフィールドが含まれているのMongoDBへ
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>spark_mongo_upsert_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>1.6.0-cdh5.7.0</spark.version>
<mongo.version>3.2.0</mongo.version>
<mongo.hadoop.version>2.0.2</mongo.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.mongo-hadoop</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.mongodb.</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>${mongo.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Plugin to compile Scala code -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
</plugin>
</plugins>
</build>
</project>