2017-10-09 9 views
8

私はプロジェクトのMongo Sparkコネクタを評価していますが、矛盾した結果が出ています。私はMongoDBサーバーバージョン3.4.5、Spark(PySpark経由)バージョン2.2.0、Mongo Spark Connectorバージョン2.11; 2.2.0をローカルで使用しています。私のテストDBではEnronデータセットを使用しますhttp://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/私はSpark SQLクエリに興味があり、カウントのための簡単なテストクエリを実行すると、実行ごとに異なるカウントを受け取りました。なぜMongo Sparkコネクタがクエリに対して異なるカウントと不正確なカウントを返すのですか?

In [1]: df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/enron_mail.messages").load() 
In [2]: df.registerTempTable("messages") 
In [3]: res = spark.sql("select count(*) from messages where headers.To='[email protected]'") 
In [4]: res.show() 
+--------+                  
|count(1)| 
+--------+ 
|  162| 
+--------+ 
In [5]: res.show() 
+--------+                  
|count(1)| 
+--------+ 
|  160| 
+--------+ 
In [6]: res = spark.sql("select count(_id) from messages where headers.To='[email protected]'") 
In [7]: res.show() 
+----------+                  
|count(_id)| 
+----------+ 
|  161| 
+----------+ 
In [8]: res.show() 
+----------+                  
|count(_id)| 
+----------+ 
|  162| 
+----------+ 

は、私はこの問題についてGoogleの中で検索が、私は役に立つ何かを見つけることができませんでした。ここに私のPySparkシェルからいくつか出力され

> db.messages.count({'headers.To': '[email protected]'}) 
203 

: はここに私のmongoシェルから出力されます。もし誰かがこれがどうして起こりうるのか、それを正しく処理するためのアイデアがあれば、あなたのアイデアを共有してください。私は何かを逃したかもしれない、あるいは何かが正しく構成されていなかったかもしれないという気持ちがあります。

更新日: 私は私の問題を解決しました。矛盾したカウントの理由は、ランダムサンプリングを使用するMongoSamplePartitionerをラップするMongoDefaultPartitionerでした。正直言って、これは私にとっては奇妙なデフォルトです。私は個人的には遅いが、一貫性のあるパーティショナーを使う方が好きです。パーティショナーオプションの詳細については、configuration optionsの公式ドキュメントを参照してください。

更新: 解決策を回答にコピーしました。

答えて

6

私は私の問題を解決しました。矛盾したカウントの理由は、ランダムサンプリングを使用するMongoSamplePartitionerをラップするMongoDefaultPartitionerでした。正直言って、これは私にとっては奇妙なデフォルトです。私は個人的には遅いが、一貫性のあるパーティショナーを使う方が好きです。パーティショナーオプションの詳細は、configuration optionsの公式文書に記載されています。

コード:

val df = spark.read 
    .format("com.mongodb.spark.sql.DefaultSource") 
    .option("uri", "mongodb://127.0.0.1/enron_mail.messages") 
    .option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ") 
    .load() 
+0

'()' Scalaで必須ではありません。 – mrsrinivas

+0

@mrsrinivasああ、すみません、私はそれを知らなかったのです。前にいくつかのスカラーの例を調べましたが、すべてが.load()を使用していました。 – artemdevel

関連する問題