2017-05-16 11 views
1

私はMongoDBでPySparkを使用しており、日付フィルタを使用してパイプラインを使用してデータベースをクエリしたいと考えています。 はMongoのでは、私のクエリは、次のようになります。PySpark MongoDBクエリの日付

db.collection.aggregate([{$match:{"creation":{$lte:new Date("Jan 1, 2016")}}},{$sort:{"creation":1}}]) 

しかし、私はPythonで同じことを行う方法がわかりません。例えば、私が試した:

pipeline = [{'$match': {'creation': {'$lte': datetime.datetime(2016, 1, 1, 0, 0)}}}, {'$sort': {'creation': 1}}] 
df = context.read.format("com.mongodb.spark.sql").options(pipeline=pipeline).load() 

と私はエラーを得た:org.bson.json.JsonParseExceptionを:JSONリーダーが値を期待していたが、'日時'を見つけました。

(私はSQLクエリのパイプラインやないですべてをやりたい)

答えて

0

あなたは、日付を指定するMongoDB extended JSONを利用することができます。例:

pipeline = [{'$match':{'creation':{'$lte': {'$date': "2016-01-01T00:00:00Z" }}}}] 
df_pipeline = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
          .option("pipeline", pipeline).load() 
df_pipeline.first()