2017-12-24 27 views
0

私は234百万のレコードでSparkのMongoDBでコレクションを読み込もうとしています。私は唯一のフィールドが欲しい。エラー:com.mongodb.MongoCursorNotFoundException:MongoDBからSparkへの読み込み中

case class Linkedin_Profile(experience : Array[Experience]) 
case class Experience(company : String) 

val rdd = MongoSpark.load(sc, ReadConfig(Map("uri" -> mongo_uri_linkedin))) 
val company_DS = rdd.toDS[Linkedin_Profile]() 
val count_udf = udf((x: scala.collection.mutable.WrappedArray[String]) => {x.filter(_ != null).groupBy(identity).mapValues(_.size)}) 
val company_ColCount = company_DS.select(explode(count_udf($"experience.company"))) 
comp_rdd.saveAsTextFile("/dbfs/FileStore/chandan/intermediate_count_results.csv") 

ジョブが完了したジョブの半分で1時間実行されますが、それは誤りに

com.mongodb.MongoCursorNotFoundException: 
Query failed with error code -5 and error message 
'Cursor 8962537864706894243 not found on server cluster0-shard-01-00-i7t2t.mongodb.net:37017' 
on server cluster0-shard-01-00-i7t2t.mongodb.net:37017 

を与えることをした後、私は以下で構成を変更しようとしたが、無駄に。

System.setProperty("spark.mongodb.keep_alive_ms", "7200000") 

この大きなコレクションの読み方をお勧めします。

答えて

0

configプロパティpark.mongodb.keep_alive_msは、クライアントの寿命を制御するためのものです。 docs hereを参照してください。

問題は、サーバー側の設定に関連しているようです。 on this issueを文書化しているものによると:

By specifing the cursorTimeoutMillis option, administrators can configure mongod or mongos to automatically remove idle client cursors after a specified interval. The timeout applies to all cursors maintained on a mongod or mongos, may be specified when starting the mongod or mongos and may be modified at any time using the setParameter command.

だから、のような、指定された cursorTimeoutMillisであなた mongodデーモンを起動してみてください:

mongod --setParameter cursorTimeoutMillis=10800000 

このコマンドは、3時間有効のカーソルを維持するために、サーバーに指示しようとします。

これは理論上は煩わしさを解消することができますが、読み取りをより速く完了させることはまだ良い考えです。コレクションに格納されているデータセットを実際にSparkにロードしたいものに限定することを検討したい場合があります。調査する価値のある読取り速度を調整するための多くのオプションがあります。

関連する問題