2017-03-07 6 views
0

MongoDB Connector for HashopをSparkと共に使用して、MongoDBの1つのコレクションを照会し、取り出されたすべてのドキュメントを別のコレクションにアップアップしようとしています。 MongoUpdateWritableクラスは、MongoDBのコレクションを更新するRDDの値に使用され、upsertフラグを持ちます。残念ながら、upsertフラグは実行に影響しないようです。あたかもupsertフラグがfalseに設定されているかのように、コードはエラーなしで実行されています。mongo-hadoopパッケージupsertとsparkが動作していないようです

このコードは、ローカルホストのmongodプロセスに接続し、mongoクライアントを使用してデータを書き込み、そのデータを読み込み、sparkを使用して同じデータベース内の別のコレクションに書き込もうとします。書き込みが完了しないと、コードは同じIDを持つmongoクライアントを介してターゲットテーブルに文書を書き込み、同じsparkジョブを実行してupsertの更新部分が正しく動作することを示します。

スパークバージョン:1.6.0-cdh5.7.0

Hadoopのバージョン:2.6.0-cdh5.4.7

Mongoのバージョン:3.2.0

Mongoの-Hadoopのコア・バージョン:2.0 .2

import com.mongodb.client.{FindIterable, MongoCollection, MongoDatabase} 
import com.mongodb.{BasicDBObject, DBCollection, MongoClient} 
import com.mongodb.hadoop.io.MongoUpdateWritable 
import org.apache.hadoop.conf.Configuration 
import org.bson.{BSONObject, BasicBSONObject, Document} 
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkConf, SparkContext} 

object sparkTest extends App { 

    //setting up mongo 
    val mongo: MongoDatabase = new MongoClient("localhost",27017).getDatabase("test") 
    var source: MongoCollection[Document] = mongo.getCollection("source") 
    val target: MongoCollection[Document] = mongo.getCollection("target") 
    source.drop() 
    target.drop() 
    //inserting document 
    val sourceDoc = new Document() 
    sourceDoc.put("unchanged","this field should not be changed") 
    sourceDoc.put("_id","1") 
    source.insertOne(sourceDoc) 

    //setting up spark 
    val conf = new SparkConf().setAppName("test mongo with spark").setMaster("local") 
    val mongoConfig = new Configuration() 
    val sc = new SparkContext(conf) 
    mongoConfig.set("mongo.input.uri", 
    "mongodb://localhost:27017/test.source") 
    mongoConfig.set("mongo.output.uri", 
    "mongodb://localhost:27017/test.target") 

    //setting up read 
    val documents = sc.newAPIHadoopRDD(
    mongoConfig,    // Configuration 
    classOf[MongoInputFormat], // InputFormat 
    classOf[Object],   // Key type 
    classOf[BSONObject])  // Value type 

    //building updates with no document matching the query in the target collection 
    val upsert_insert_rdd: RDD[(Object, MongoUpdateWritable)] =  documents.mapValues(
    (value: BSONObject) => { 

    val query = new BasicBSONObject 
    query.append("_id", value.get("_id").toString) 

    val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject]) 
    update.append("added","this data will be added") 

    println("val:"+value.toString) 
    println("query:"+query.toString) 
    println("update:"+update.toString) 

    new MongoUpdateWritable(
    query, // Query 
    update, // Update 
    true, // Upsert flag 
    false, // Update multiple documents flag 
    true // Replace flag 
    )} 
) 
    //saving updates 
    upsert_insert_rdd.saveAsNewAPIHadoopFile(
    "", 
    classOf[Object], 
    classOf[MongoUpdateWritable], 
    classOf[MongoOutputFormat[Object, MongoUpdateWritable]], 
    mongoConfig) 

    // At this point, there should be a new document in the target database, but there is not. 
    val count = target.count() 
    println("count after insert: "+count+", expected: 1") 

    //adding doc to display working update. This code will throw an exception  if there is a 
    //document with a matching _id field in the collection, so if this breaks that means the upsert worked! 
    val targetDoc = new Document() 
    targetDoc.put("overwritten","this field should not be changed") 
    targetDoc.put("_id","1") 
    target.insertOne(targetDoc) 

    //building updates when a document matching the query exists in the target collection 
    val upsert_update_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
    (value: BSONObject) => { 

     val query = new BasicBSONObject 
     query.append("_id", value.get("_id").toString) 

     val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject]) 
     update.append("added","this data will be added") 

     println("val:"+value.toString) 
     println("query:"+query.toString) 
     println("update:"+update.toString) 

     new MongoUpdateWritable(
     query, // Query 
     update, // Update 
     true, // Upsert flag 
     false, // Update multiple documents flag 
     true // Replace flag 
    )} 
) 
    //saving updates 
    upsert_update_rdd.saveAsNewAPIHadoopFile(
    "", 
    classOf[Object], 
    classOf[MongoUpdateWritable], 
    classOf[MongoOutputFormat[Object, MongoUpdateWritable]], 
    mongoConfig) 

    //checking that the update succeeded. should print: 
    //contains new field:true, contains overwritten field:false 
    val ret = target.find().first 
    if (ret != null) 
    println("contains new field:"+ret.containsKey("added")+", contains overwritten field:"+ret.containsKey("overwritten")) 
    else 
    println("no documents found in target") 
} 

私が紛失しているものについての洞察は役に立ちます。出力形式をMongoUpdateWritableに変更しようとしましたが、動作に影響はありませんでした。これはおそらく設定上の問題ですが、入力と出力の両方の形式とMongoUpdateWritableクラスを使って文書を書いても、文書を読み書きすることができたので、mongo hadoopアダプタのバグのようです。便宜上

POM:既存の文書を交換し、アップサートます_idフィールドが含まれているのMongoDBへ

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>test</groupId> 
    <artifactId>spark_mongo_upsert_test</artifactId> 
    <version>1.0-SNAPSHOT</version> 

    <properties> 
     <spark.version>1.6.0-cdh5.7.0</spark.version> 
     <mongo.version>3.2.0</mongo.version> 
     <mongo.hadoop.version>2.0.2</mongo.hadoop.version> 
    </properties> 

    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.10</artifactId> 
      <version>${spark.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_2.10</artifactId> 
      <version>${spark.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.mongodb.mongo-hadoop</groupId> 
      <artifactId>mongo-hadoop-core</artifactId> 
      <version>2.0.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.mongodb.</groupId> 
      <artifactId>mongo-java-driver</artifactId> 
      <version>${mongo.version}</version> 
     </dependency> 

    </dependencies> 

    <build> 
     <plugins> 
      <!-- Plugin to compile Scala code --> 
      <plugin> 
       <groupId>net.alchim31.maven</groupId> 
       <artifactId>scala-maven-plugin</artifactId> 
       <version>3.2.1</version> 
      </plugin> 
     </plugins> 
    </build> 

</project> 

答えて

関連する問題