2017-12-13 8 views
2

私はsparklyrのクラスタリング問題を解決しようとしています。トレーニングセットの変数の多くは、異なるスケールで測定され、したがって、桁違いに異なる。ベストプラクティスでは、私はデータをスケールして中心化しようとしています。mutate_eachを使用してスパークリヤの変数をスケールおよびセンタリングする

Xをランダム変数、μ=平均、σ=標準偏差とする最も伝統的な式(X - μ)/σがあります。私はX =確率変数、x =標本平均、x_max =最大値、x_min =最小値で(X-x)/(x_max-x_min)を使う傾向があります。

dplyrを使用してこの変換を適用した後、私は奇妙な結果を得ています。この例を考えてみましょう:

#connect to spark 
    library(sparklyr) 
    library(SparkR) 
    library(dplyr) 
    sc = spark_connect(master = 'yarn-client', 
         spark_home = '/usr/hdp/current/spark-client', 
         app_name = 'sparklyr' 
         # config = list(
         # "sparklyr.shell.executor-memory" = "XG", 
         # "sparklyr.shell.driver-memory" = "XG", 
         # "spark.driver.maxResultSize"  = "XG" # may need to transfer a lot of data into R 
    ) 

    sparkR.init() 

#create a dataframe where variables in the dataset differ by an order of magnitude 
mat <- as.data.frame(matrix(data = rnorm(200, mean=100,sd=10), nrow = 1000, ncol = 50)) 
mat1 <- as.data.frame(matrix(data = rnorm(200, mean=0,sd=1), nrow = 1000, ncol = 50)) 
colnames(mat1) <- paste('X',1:50,sep='') 
mat.final <- cbind(mat,mat1) 

#copy to Spark 
dat.out <- sdf_copy_to(sc,mat.final,'dat',overwrite=TRUE) 

#define centering and scaling function 
scale.center <- function(x){ 
    (x-mean(x,na.rm=TRUE)) /(max(x,na.rm = TRUE)-min(x,na.rm = TRUE)) 
} 

#scale data 
dat.out1 <- 
    dat.out %>% 
    mutate_each(funs(s=scale.center)) 

コードが実行されますが、私は奇妙な何かを得る:

str(dat.out1) 

$ ops:List of 4 
    ..$ name: chr "mutate" 
    ..$ x :List of 4 
    .. ..$ name: chr "mutate" 
    .. ..$ x :List of 4 
    .. .. ..$ name: chr "mutate" 
    .. .. ..$ x :List of 4 
    .. .. .. ..$ name: chr "mutate" 
    .. .. .. ..$ x :List of 4 
    .. .. .. .. ..$ name: chr "mutate" 
    .. .. .. .. ..$ x :List of 4 
    .. .. .. .. .. ..$ name: chr "mutate" 
    .. .. .. .. .. ..$ x :List of 4 
    .. .. .. .. .. .. ..$ name: chr "mutate" 
    .. .. .. .. .. .. ..$ x :List of 4 
    .. .. .. .. .. .. .. ..$ name: chr "mutate" 
    .. .. .. .. .. .. .. ..$ x :List of 4 
    .. .. .. .. .. .. .. .. ..$ name: chr "mutate" 
    .. .. .. .. .. .. .. .. ..$ x :List of 4 
    .. .. .. .. .. .. .. .. .. ..$ name: chr "mutate" 
    .. .. .. .. .. .. .. .. .. ..$ x :List of 4 
    .. .. .. .. .. .. .. .. .. .. ..$ name: chr "mutate" 
    .. .. 

上記はstrを実行した後、出力のほんの一部です。ここで何がうまくいかないかについての考え。私はセンタリングとスケーリングのための機能が組み込まれていないことに驚いています。

+0

'?scale'は基底付きで出荷されます – hrabel

+0

SparkRでは実装されていません。 – schristel

+0

ああ、申し訳ありません。したがって、 'scale'はあなたのコードに上記のエラーを生成しますか? 'mutate_each'は' mutate_all'のために廃止されているのではないのですか? – hrabel

答えて

3

ここで何が問題になっているかを考えてください。

それは本質的Sparklyr: how to center a Spark table based on column?に記載したのと同じ問題である - mutateに使用される集約関数は、(PARTITION BY句なし)グローバルに展開された窓関数実際に、このアプローチは完全に無用になります。

センタリングとスケーリングの機能が組み込まれていないのは驚きです。

まあ、一般的なスパークでML Transformerssparklyrに移植されたの束を使用して動作します。これらは接頭辞ft_で区別することができます。残念ながらStandardScalerMinMaxScalerはまだ移植されていません。しかし、自分のインターフェースを実装するのは難しいことではありません。

あなたがあるとして、データ上で動作することができ、迅速たい場合:

library(rlang) 
library(glue) 

# Compute all the stats at once 
stats <- dat.out %>% summarise_all(funs(avg, min, max)) %>% collect() 

# Separate stats into components 
cols <- dat.out %>% colnames() 
avgs <- stats %>% select(ends_with("avg")) %>% unlist 
mins <- stats %>% select(ends_with("min")) %>% unlist 
maxs <- stats %>% select(ends_with("max")) %>% unlist 

# Create expressions 
exprs <- glue("({cols} - {avgs})/({maxs} - {mins})") %>% 
    setNames(cols) %>% 
    lapply(parse_quosure) 

dat.out %>% mutate(!!! exprs) 

クレジットが再びArtem Sokolovdplyr 0.7 equivalent for deprecated mutate_)に行くの。

sparklyrで使用される機能を.を使用しないでください。 dplyrは、これらを "接頭辞"データベースのデータベース関数として照合しようとし、失敗したり、意図しない結果を生成したりします。

+0

私はこの解決策で唯一の問題は、スケールがよくないように見えることです。私の**実際の**データは500列と1,200万行あります。 'stats <- dat.out %>%summarise_all(funs(avg、min、max))%>%collect()'はすでに5分間以上実行されています。 – schristel

+0

5分:1時間:/ – schristel

+0

データはハイブテーブルにあります。 Spark Rに接続するための設定は 'sc = spark_connect(master = 'yarn-client'、 spark_home = '/ usr/hdp/current/spark-client'、 app_name = 'sparklyr'、 config = list( )です。 "sparklyr.shell.executorメモリ" = "24G"、 "sparklyr.shell.driverメモリ" = "16G"、 "spark.driver.maxResultSize" = "16G") ) sparkR.init( ) '私はdplyr関数' tbl'を使ってハイブからデータを読んでいます – schristel

関連する問題