SparkとScalaが新しくなりました。私はこのような文書でMongoのコレクションを持っている:Spark Scala flatmap with bson document Mongoからのサブ文書付き
{
"_id": "doc_1",
"posts": {
"a": { "total": 1 },
"b": { "total": 2 }
}
}
私は平らにflatMap(または別の適切な機能)を使用したいと思い、この
val rc = ReadConfig(Map("collection" -> "my_collection"), Some(ReadConfig(sparkSession)))
val rdd = MongoSpark.load(sparkContext, rc)
のようなスパークRDDにこれをロードしていますこのような新しいRDDへの投稿のサブ文書:
|--------|---------|-------|
| doc_id | post_id | total |
|--------|---------|-------|
| doc_1 | a | 1 |
| doc_1 | b | 2 |
| doc_2 | ... | ... |
|--------|---------|-------|
(私はRDDではなく、データフレームを使用している文書が大きく、これはあまりメモを使用するように表示されますので、 ry)。
flatMapの署名はflatMap[U](f: (T) => TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]
です。 RDD内の各オブジェクトはMongoのコネクタからorg.bson.Document
ですので、私のような何か書きたい:
val newRdd = rdd.flatMap({ x: org.bson.Document => { x.posts }})
をしかし、これは与える:
値ポストはorg.bson.Documentのメンバーではありません
私は多くのGoogle検索を行っています。このようなシンプルなはずだが、私はそれを把握することはできません。あなたは正しい方向に私を向けることができますか?
あなたは爆発機能を試しましたか? –