spark

2017-02-21 4 views
3

を使用してスカラーリストをmongodbに永続化する方法mongodbからいくつかの文書をフェッチするsparkコードを持っています。変換を行い、mongodbに戻して保存しようとします。spark

私は、次の機能を使用してListオブジェクトを永続化しようとすると問題が起こる:

まず私は、この機能を使用して、いくつかのタプルを生成します。

val usersRDD = rdd.flatMap(breakoutFileById).distinct().groupByKey().mapValues(_.toList) 

その後、私が使用してドキュメントへのタプルのフィールドを変換します私は、次のエラーmessagを取得しています

usersRDD.map(mapToDocument).saveToMongoDB() 

:カスタムmapToDocument機能とsaveToMongoDB関数を呼び出しますE:私はリストを削除する場合mapToDocument機能に(文書内のフィールドとして入れない)

org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class scala.collection.immutable.$colon$colon. 
    at org.bson.codecs.configuration.CodecCache.getOrThrow(CodecCache.java:46) 
    at org.bson.codecs.configuration.ProvidersCodecRegistry.get(ProvidersCodecRegistry.java:63) 
    at org.bson.codecs.configuration.ChildCodecRegistry.get(ChildCodecRegistry.java:51) 
    at org.bson.codecs.DocumentCodec.writeValue(DocumentCodec.java:174) 
    at org.bson.codecs.DocumentCodec.writeMap(DocumentCodec.java:189) 
    at org.bson.codecs.DocumentCodec.encode(DocumentCodec.java:131) 
    at org.bson.codecs.DocumentCodec.encode(DocumentCodec.java:45) 
    at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63) 
    at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29) 
    at com.mongodb.connection.InsertCommandMessage.writeTheWrites(InsertCommandMessage.java:101) 
    at com.mongodb.connection.InsertCommandMessage.writeTheWrites(InsertCommandMessage.java:43) 
    at com.mongodb.connection.BaseWriteCommandMessage.encodeMessageBodyWithMetadata(BaseWriteCommandMessage.java:129) 
    at com.mongodb.connection.RequestMessage.encodeWithMetadata(RequestMessage.java:160) 
    at com.mongodb.connection.WriteCommandProtocol.sendMessage(WriteCommandProtocol.java:212) 
    at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:101) 
    at com.mongodb.connection.InsertCommandProtocol.execute(InsertCommandProtocol.java:67) 
    at com.mongodb.connection.InsertCommandProtocol.execute(InsertCommandProtocol.java:37) 
    at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:159) 
    at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:286) 
    at com.mongodb.connection.DefaultServerConnection.insertCommand(DefaultServerConnection.java:115) 
    at com.mongodb.operation.MixedBulkWriteOperation$Run$2.executeWriteCommandProtocol(MixedBulkWriteOperation.java:455) 
    at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:646) 
    at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:401) 
    at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:179) 
    at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168) 
    at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:230) 
    at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:221) 
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168) 
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74) 
    at com.mongodb.Mongo.execute(Mongo.java:781) 
    at com.mongodb.Mongo$2.execute(Mongo.java:764) 
    at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:323) 
    at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:311) 
    at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132) 
    at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:742) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) 
    at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:132) 
    at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:131) 
    at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186) 
    at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184) 
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) 
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) 
    at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) 
    at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) 
    at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184) 
    at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:131) 
    at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

、すべての作品。私はすでにインターネット上で同様の問題を探していましたが、私はそれに合った解決策を見つけることができませんでした。 誰かがそれを解決する手掛かりを持っていますか?ドキュメントのunsupported typesセクションから事前

答えて

3

ありがとう:

一部Scalaの種類(例えば、リスト)がサポートされていないため、自分のJavaと同等に を変換する必要があります。 Scalaからネイティブタイプに変換するには .asJavaメソッドを使用するには、次のimport文を含めます。この文脈ではスカラ非同期ドライバを使用しない点をtheresのよう

import scala.collection.JavaConverters._ 
import org.bson.Document 

val documents = sc.parallelize(
    Seq(new Document("fruits", List("apples", "oranges", "pears").asJava)) 
) 
MongoSpark.save(documents) 

それらがサポートされていない理由は、下にMongoのJavaドライバを使用して、Mongoのスパークコネクタによるものです。ただし、サポートされているJavaの種類にマップする必要があるRDDを意味します。データセットを使用すると、これらのコンバージョンは自動的に実行されます。

関連する問題