2016-04-01 11 views
1

StandardScalerfrom pyspark.mllib.feature import StandardScaler)でデータを拡大したいのですが、RDDの値を変換関数に渡すことでこれを行うことができますが、キーを保持したいという問題があります。とにかくキーを保存してデータを拡大することはありますか?Sparkでグループ単位でデータをスケールすることはできますか?

サンプルデータセット

0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal. 
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal. 
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,smurf. 

輸入

import sys 
import os 
from collections import OrderedDict 
from numpy import array 
from math import sqrt 
try: 
    from pyspark import SparkContext, SparkConf 
    from pyspark.mllib.clustering import KMeans 
    from pyspark.mllib.feature import StandardScaler 
    from pyspark.statcounter import StatCounter 

    print ("Successfully imported Spark Modules") 
except ImportError as e: 
    print ("Can not import Spark Modules", e) 
    sys.exit(1) 

コードの一部

sc = SparkContext(conf=conf) 
    raw_data = sc.textFile(data_file) 
    parsed_data = raw_data.map(Parseline) 

Parseline機能:

def Parseline(line): 
    line_split = line.split(",") 
    clean_line_split = [line_split[0]]+line_split[4:-1] 
    return (line_split[-1], array([float(x) for x in clean_line_split])) 

答えて

3

正確な解決策ではありませんが、回答をthe similar Scala questionに調整できます。例のデータで開始できます:

import numpy as np 

np.random.seed(323) 

keys = ["foo"] * 50 + ["bar"] * 50 
values = (
    np.vstack([np.repeat(-10, 500), np.repeat(10, 500)]).reshape(100, -1) + 
    np.random.rand(100, 10) 
) 

rdd = sc.parallelize(zip(keys, values)) 

は残念ながらMultivariateStatisticalSummaryだけでJVMモデルのラッパーであり、それは本当にPythonの友好的ではありません。幸いにもnumpyのアレイと、私たちはキーで統計を計算するための標準的なStatCounterを使用することができます。

from pyspark.statcounter import StatCounter 

def compute_stats(rdd): 
    return rdd.aggregateByKey(
     StatCounter(), StatCounter.merge, StatCounter.mergeStats 
    ).collectAsMap() 

最後に、我々はmap正規化することができます。

def scale(rdd, stats): 
    def scale_(kv): 
     k, v = kv 
     return (v - stats[k].mean())/stats[k].stdev() 
    return rdd.map(scale_) 

scaled = scale(rdd, compute_stats(rdd)) 
scaled.first() 

## array([ 1.59879188, -1.66816084, 1.38546532, 1.76122047, 1.48132643, 
## 0.01512487, 1.49336769, 0.47765982, -1.04271866, 1.55288814]) 
+0

を私はそれが私にこのエラー「はTypeErrorを与えるこのコードを使用する場合:最初の 引数としてNoneTypeインスタンスを呼び出す必要があります(代わりにStatCounterインスタンスがあります) "、どういう考えですか? – Iman

+0

データに欠損値がありますか?どのような種類ですか? – zero323

+0

データの構造は次のようなものです[Label、array([数値浮動小数点値のリスト])、各ラベルは正常または攻撃、欠損値なし – Iman

関連する問題