2016-08-09 12 views
2

私のアプリケーションは、MongoDBをプラットフォームとして構築されています。 DB内の1つのコレクションには膨大な量のデータがあり、計算によって分析データを取得して生成するためにapache sparkが選択されています。 Spark Connector for MongoDBをMongoDBと通信するように設定しました。 pysparkを使用してMongoDBコレクションをクエリし、mongodbクエリの結果セットで構成されるデータフレームを構築する必要があります。 私に適切な解決策を提案してください。MongoDBからフィルタリングされたレコードを含むSparkデータフレームを構築するには?

+0

なぜあなたは[Stratioコネクタ](https://github.com/Stratio/をしようとしないでくださいSpark-MongoDB)?このコネクターはデータフレームを直接返します –

+0

@JohnZeng以下はstratioコネクターを使用して実装したコードスニペットです。 (ホスト= 'localhost:27017'、データベース= 'mydb'、コレクション= 'mycoll')。load() –

+0

あなたはplsを編集できますか?あなたの質問とスニペットを貼り付けますか?私はあなたがこれを呼んだ後ですでにデータフレームを取得していると思います。あなたの質問がMongoDBのコネクタにリンクしているので、今何を望んでいるのか混乱しています。 –

答えて

5

あなたがそうのように、直接データフレームにデータをロードすることができます。詳細は

# Create the dataframe 
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/mydb.mycoll").load() 

# Filter the data via the api 
df.filter(people.age > 30) 

# Filter via sql 
df.registerTempTable("people") 
over_thirty = sqlContext.sql("SELECT name, age FROM people WHERE age > 30") 

はモンゴスパークコネクタPython APIセクションまたはintroduction.pyを参照してください。 SQLクエリは変換され、コネクターに返され、MongoDBでデータを照会してからsparkクラスターに送ることができます。

あなたはまた、スパークに結果を返す前に、コレクションに適用するために、独自のaggregation pipelineを提供することができます。

dfr = sqlContext.read.option("pipeline", "[{ $match: { name: { $exists: true } } }]") 
df = dfr.option("uri", ...).format("com.mongodb.spark.sql.DefaultSource").load() 
+0

ありがとうございます@ロスです。しかし、データフレーム全体にフィルタを適用するのではなく、データベースクエリにフィルタを直接適用する必要があります。 –

+0

これはコレクションのクエリに変換され、コネクタは戻りますフィルタリングされた結果。 – Ross

+0

コードスニペットで詳細を教えてください –

関連する問題