2017-06-26 6 views
1

SparkとScalaが新しくなりました。私はこのような文書でMongoのコレクションを持っている:Spark Scala flatmap with bson document Mongoからのサブ文書付き

{ 
    "_id": "doc_1", 
    "posts": { 
     "a": { "total": 1 }, 
     "b": { "total": 2 } 
    } 
} 

私は平らにflatMap(または別の適切な機能)を使用したいと思い、この

val rc = ReadConfig(Map("collection" -> "my_collection"), Some(ReadConfig(sparkSession))) 
val rdd = MongoSpark.load(sparkContext, rc) 

のようなスパークRDDにこれをロードしていますこのような新しいRDDへの投稿のサブ文書:

|--------|---------|-------| 
| doc_id | post_id | total | 
|--------|---------|-------| 
| doc_1 | a  | 1  | 
| doc_1 | b  | 2  | 
| doc_2 | ...  | ... | 
|--------|---------|-------| 

(私はRDDではなく、データフレームを使用している文書が大きく、これはあまりメモを使用するように表示されますので、 ry)。

flatMapの署名はflatMap[U](f: (T) => TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]です。 RDD内の各オブジェクトはMongoのコネクタからorg.bson.Documentですので、私のような何か書きたい:

val newRdd = rdd.flatMap({ x: org.bson.Document => { x.posts }}) 

をしかし、これは与える:

値ポストはorg.bson.Documentのメンバーではありません

私は多くのGoogle検索を行っています。このようなシンプルなはずだが、私はそれを把握することはできません。あなたは正しい方向に私を向けることができますか?

+1

あなたは爆発機能を試しましたか? –

答えて

3

JavaScriptではありません:)クラス内のフィールドのみを使用する必要があります。 JavaScript表記は許可されていません。

私が見ることができるよう、Documentには、使用可能な機能getを、持っている:

代わりに、あなたの転換を投稿する必要があります // do something
case class Post (///... 
val newRdd = rdd.flatMap({ x: org.bson.Document => { x.get("posts", Post)// do something }}) 

関連する問題