0
MLモジュールを使って分類器を構築するために、Sparkを研究しています。MongoDBからpySparkに行列をロード/逆シリアル化し、新しいベクトル列を追加します。
私はこの作業にPandasをうまく使用しましたが、データ量が増えましたが、今はRAMに収まりません。私はDaskを使って肯定的な経験もしていますが、その機械学習ライブラリは生産の準備ができていません。
私のデータはMongoDBに保存され、小さな画像を含み、cPickle
でシリアライズされています。私はpyspark.ml.linalg
モジュールで定義された分類は、ベクトルや行列と連携MLのドキュメントから理解しました
import os
import numpy as np
import pymongo
from bson.binary import Binary
records = []
for file_path in file_paths:
for r in file(fn):
normalized_image = np.random.rand(120, 40)
this_result = {'file_name': os.path.basename(file_path),
'normalized_image' : Binary(cPickle.dumps(normalized_image, protocol=2)),
# other data
}
records.append(this_result)
client = pymongo.MongoClient(MONGO_CREDENTIALS)
db_name = 'database_name'
client.drop_database(db_name)
database = client[db_name]
collection = database['data_sample']
collection.insert_many(records)
:ここ
は、彼らの創造のために切り取らコードです。公式のMongoDB Sparkコネクタを使用して、データベースからSpark DataFrameにデータを読み取ることができました。 はしかし、彼らはまだシリアライズさ:
from pyspark.sql import SparkSession
import pyspark.ml as ml
spark = SparkSession \
.builder \
.appName("sparktest") \
.config("spark.mongodb.input.uri", "mongodb://mongo.server/database_name.data_sample") \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load().drop('_id')
df.printSchema()
root
|-- file_name: string (nullable = true)
|-- normalized_image: binary (nullable = true)
|-- parea: double (nullable = true)
|-- sns: double (nullable = true)
....
は、私はそれらをデシリアライズするにはどうすればよいですか?
また、これらの各画像のヒストグラムを計算し、結果のデータフレームの新しい列として保存する必要があります。