2016-04-20 7 views
0

私は既に多くの研究をしましたが、解決策を見つけることができませんでした。私がここで見つけることができる最も近い質問はWhy my SPARK works very slowly with mongoDBです。mongodbから効率的にデータを読み込み、それをsparkのデータフレームに変換する方法は?

mongo-hadoopコネクタを使用して、spongのDataFrameにmongodbコレクションをロードしようとしています。関連するコードのスニペットは次のとおりです。

connection_string = 'mongodb://%s:%s/randdb.%s'%(dbhost, dbport, collection_name) 
trainrdd = sc.mongoRDD(connection_string, config=config) 
#  traindf = sqlcontext.createDataFrame(trainrdd) 
#  traindf = sqlcontext.read.json(trainrdd) 
traindf = sqlcontext.jsonRDD(trainrdd) 

ここで、「sc」はSparkContextオブジェクトです。私はコード内でコメントアウトされているバリアントも試しました。しかし、すべてが同じように遅いです。サイズが2GB(100000行と1000列)のコレクションの場合、12コアと72 GB RAMを持つ3台のマシンのクラスターで(このスパーククラスター内のすべてのコアを使用して)約6時間(神聖モリ:/)になります。 Mongodbサーバは、これらのマシンの1つでも動作しています。

正しく行っているかわかりません。このコードをどのように最適化するかについての指針は本当に役に立ちます。

+0

を参照していますか?他の方法でRDDをDataFrameに変換できますか? –

+0

こんにちはワン お返事ありがとうございます。はい、実際のアクションは 'sqlcontext.jsonRDD(trainrdd)'を呼び出すと開始します。これにより、mongodbの読み込みが開始され、接続が確立され、削除されたことを示すmongodbログが記録されます。私は他の方法(上記のコードでコメントアウト)を試みましたが、これも同様に遅いです。最近、mongodbコレクションからエクスポートされたjsonファイルでsqlcontext.read.jsonを試しました。これは比較的速く比較的に機能しました。 – bitspersecond

+0

[mongodb mongo-hadoop spark connector](https://github.com/mongodb/mongo-hadoop/blob/master/spark/src/main/python/README.rst)のどのjarバージョンを使用していますか? MongoDBサーバをSparkノードから分離することはできますか? –

答えて

2

デフォルトではpyspark.sql.SQLContext.jsonRDDは、指定されたJSONデータセットのスキーマを動的に推論します。新しいJSONフィールドが見つかると、列が追加されます。これはすべてのJSON属性が検査されるにつれて遅くなる可能性があります。特に1000 coloumnsがある場合。

データがわかっている場合は、スキーマを明示的に定義することができます。または特定のフィールドセットのみが必要です。

HADOOP-277に記載されているObjectIdの問題のため、このような互換性のないタイプを含むフィールドを削除するか、他のタイプに変換する必要があります。すなわち、例えばstr(ObjectId(...))

を:

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
from pyspark.sql.types import StructType, StructField, StringType 
import pymongo_spark 
pymongo_spark.activate() 
data_rdd = sc.mongoRDD("mongodb://localhost:27017/database.collection") 
sqlcontext = SQLContext(sc) 

# Define your schema explicitly 
schema = StructType([StructField("firstname", StringType()), 
        StructField("lastname", StringType()), 
        StructField("description", StringType())]) 

# Create a mapper function to return only the fields wanted, or to convert. 
def project(doc): 
    return {"firstname": str(doc["firstname"]), 
      "lastname": str(doc["lastname"]), 
      "description": str(doc["description"])} 

projected_rdd = data_rdd.map(project) 
train_df = sqlcontext.jsonRDD(projected_rdd, schema) 
train_df.first() 

上記のスニペットは、環境でテストした:あなたは遅い `jsonRDD`の使用にSpark v1.6.1、v1.5.2 mongo-hadoop spark

+0

Hi Wanを使用しています。これはスキーマの使用についても知っています。私はそれを試してみましたが、100列では何の違いもありませんでした。私は1000カラムも試してみて、ここに結果を掲載します。ありがとう。 – bitspersecond

関連する問題