2017-04-26 10 views
0

MLモジュールを使って分類器を構築するために、Sparkを研究しています。MongoDBからpySparkに行列をロード/逆シリアル化し、新しいベクトル列を追加します。

私はこの作業にPandasをうまく使用しましたが、データ量が増えましたが、今はRAMに収まりません。私はDaskを使って肯定的な経験もしていますが、その機械学習ライブラリは生産の準備ができていません。

私のデータはMongoDBに保存され、小さな画像を含み、cPickleでシリアライズされています。私はpyspark.ml.linalgモジュールで定義された分類は、ベクトルや行列と連携MLのドキュメントから理解しました

import os 
import numpy as np 
import pymongo 
from bson.binary import Binary 

records = [] 
for file_path in file_paths: 
    for r in file(fn): 
     normalized_image = np.random.rand(120, 40) 
     this_result = {'file_name': os.path.basename(file_path), 
         'normalized_image' : Binary(cPickle.dumps(normalized_image, protocol=2)), 
         # other data 
         } 
     records.append(this_result) 


client = pymongo.MongoClient(MONGO_CREDENTIALS) 
db_name = 'database_name' 
client.drop_database(db_name) 
database = client[db_name] 
collection = database['data_sample'] 
collection.insert_many(records) 

:ここ

は、彼らの創造のために切り取らコードです。

公式のMongoDB Sparkコネクタを使用して、データベースからSpark DataFrameにデータを読み取ることができました。 はしかし、彼らはまだシリアライズさ:

from pyspark.sql import SparkSession 
import pyspark.ml as ml 

spark = SparkSession \ 
    .builder \ 
    .appName("sparktest") \ 
    .config("spark.mongodb.input.uri", "mongodb://mongo.server/database_name.data_sample") \ 
    .getOrCreate() 
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load().drop('_id') 
df.printSchema() 


root 
|-- file_name: string (nullable = true) 
|-- normalized_image: binary (nullable = true) 
|-- parea: double (nullable = true) 
|-- sns: double (nullable = true) 
.... 

は、私はそれらをデシリアライズするにはどうすればよいですか?

また、これらの各画像のヒストグラムを計算し、結果のデータフレームの新しい列として保存する必要があります。

答えて

0

今までは、私は以下のソリューション

from pyspark.sql.functions import UserDefinedFunction 

def deserialize_calc_histogram(ser_image): 
    return ml.linalg.Vectors.dense(get_histogram(data=cPickle.loads(str(ser_image)), 
           scale_factor=(4, 1.4))) 

histo = UserDefinedFunction(deserialize_calc_histogram, ml.linalg.VectorUDT()) 
encode_boolean = UserDefinedFunction(lambda b: int(b), pyspark.sql.types.IntegerType()) 


df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load().\ 
      select(['bool_label', 'is_train', 'normalized_image']) 

train = df.filter(df.is_train == True).\ 
     withColumn("norm_histogram", histo("normalized_image")).\ 
     withColumn("label", encode_boolean("bool_label")).\ 
     drop('normalized_image') 

df.printSchema() 

出力

root 
|-- label: boolean (nullable = true) 
|-- is_train: boolean (nullable = true) 
|-- normalized_image: binary (nullable = true) 
に来ています
関連する問題