2017-03-07 8 views
3

私はRDDデータをMongoDBに書き込むSparkアプリケーションを持っていますが、私はMongoBulkWriteExceptionを取得しています。以前は、MongoSparkドライバからwrite()メソッドを使用し始めましたが、私はbulkWrite()メソッドをMongoDB標準ドライバのから使用していました。MongoSparkは重複したキーエラーを保存しますE11000

何か前に、私はApacheのスパーク1.6.0MongoDBの3.2.11を使用しています。

この例外トレース:

com.mongodb.MongoBulkWriteException: Bulk write operation error on server 
10.1.101.146:27017. Write errors: [BulkWriteError{index=0, code=11000, 
message='E11000 duplicate key error collection: collection-test 
index: _id_ dup key: { : "636253651-2017-03-07" }', details={ }}] 

それは生成コード:

JavaRDD<Document> rddInsertRecords = rddGrouped.map(new Function<Tuple2<String, BasicRecord>, Document>() { 
private static final long serialVersionUID = 1L; 
    @Override 
    public Document call(Tuple2<String, BasicRecord> tuple2) throws Exception { 
      Document json = tuple2._2.toBSONDocument(); 
      return json; 
     } 
}); 
MongoSpark.save(rddInsertRecords, WriteConfig.create(sc.getConf())); 

私は私の古いコードを使用して別の解決策を持っていますが、私はMongoSparkを使用して書きたいです。

MongoDBのJIRA(https://jira.mongodb.org/browse/SERVER-14322)でこの問題を確認しましたが、この問題を回避する方法についてはわかりません。

更新日:初めて障害が発生しないことを忘れました(つまり、mongodbのデータがありません。コレクションは空です)。ジョブを2回目に実行すると失敗します。技術的にはドライバーはアップセートをするべきです、そうですか?

答えて

2

アップスパートコネクタは、RDD<T>の方法を知りません。ここで、Tはどのタイプでもかまいません。どのようにID値を取得できますか?

ただし、データセット/データフレームには、どのフィールドが_idフィールドであるかを示すスキーマ情報があり、アップセルに自動的に使用できます。これはSPARK-66で行われました。データセット/データフレームのもう1つの利点は、より効率的でSparkの仕事にパフォーマンスを向上させることです。

RDDを使用する必要がある場合は、プログラムでMongoDBコレクションにアクセスし、MongoConnectorクラスを使用してupsert操作を作成できます。

+0

Roger that。現時点でRDDをデータセットに切り替えることができないので、MongoConnectorのアプローチを使用すると思います。説明をありがとう。 – cabreracanal

関連する問題