BSONダンプを解析するスクリプトがありますが、圧縮されていないファイルでのみ動作します。 gz bsonファイルを読みながら空のRDDを取得します。PySpark:gzipped BSONファイルの読み込み時に空のRDD
pyspark_location = 'lib/pymongo_spark.py'
HDFS_HOME = 'hdfs://1.1.1.1/'
INPUT_FILE = 'big_bson.gz'
class BsonEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, ObjectId):
return str(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
return JSONEncoder.default(self, obj)
def setup_spark_with_pymongo(app_name='App'):
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sc.addPyFile(pyspark_location)
return sc
def main():
spark_context = setup_spark_with_pymongo('PysparkApp')
filename = HDFS_HOME + INPUT_FILE
import pymongo_spark
pymongo_spark.activate()
rdd = spark_context.BSONFileRDD(filename)
print(rdd.first()) #Raises ValueError("RDD is empty")
私はモンゴ-javaのドライバ-3.2.2.jar、モンゴ-Hadoopの-火花1.5.2.jar、pymongo-3.2.2-py2.7-のlinux-x86_64版とpymongo_sparkでを使用していますspark-submitと一緒に。 展開されたSparkのバージョンは、Hadoop 2.6.4とともに1.6.1です。
私はライブラリが圧縮されたBSONファイルの分割をサポートしていないことを認識していますが、分割は1つで済むはずです。 私は、分析するために圧縮されたBSONファイルが何百もあり、それらのそれぞれを実行可能なオプションではないように思われます。
どのようにすればよいでしょうか? ありがとうございます!