2015-10-15 14 views
5

parallelパッケージを使用してRで並列化コードを実行しており、定義済みのコア数をパラメータとするmclapplyが実行されています。並列計算中のコア数を変更するR

もし私が数日間稼動させる仕事をしているのであれば、サーバ時間がピークになる間により少ないコアを使用し、オフピークで使用量を増やすためにmclapply機能を書き込む(またはラップする)ピーク時間?

+2

を、私はあなたがmclapplyする層を追加する必要がありますね。基本的に、ジョブのリストをバッチで分割します。各バッチはmclapplyによって実行されます。新しいバッチを実行する前に、サーバーの状態を確認し、次のバッチを実行する適切なコア数を決定することができます。 –

+0

@KarlFornerはい、それもやり方です。 – cryo111

答えて

3

最も簡単な解決策は、データを小さなサイズのチャンクに分割し、それらのチャンクで別々にmclapplyを実行することです。次に、実行ごとのコア数をmclapplyに設定できます。これはおそらく、わずかな差異がある計算の方が良いでしょう。ランタイム。

私はこのように見えることができる方法の少し汚れ速くてモックアップ作成しました:

library(parallel) 
library(lubridate) 

#you would have to come up with your own function 
#for the number of cores to be used 
determine_cores=function(hh) { 
    #hh will be the hour of the day 
    if (hh>17|hh<9) { 
    return(4) 
    } else { 
    return(2) 
    } 
} 

#prepare some sample data 
set.seed(1234) 
myData=lapply(seq(1e-1,1,1e-1),function(x) rnorm(1e7,0,x)) 

#calculate SD with mclapply WITHOUT splitting of data into chunks 
#we need this for comparison 
compRes=mclapply(myData,function(x) sd(x),mc.cores=4) 

set.seed(1234) 
#this will hold the results of the separate mclapply calls 
res=list() 
#starting position within myData 
chunk_start_pos=1 
calc_flag=TRUE 

while(calc_flag) { 
    #use the function defined above to determine how many cores we may use 
    core_num=determine_cores(lubridate::hour(Sys.time())) 
    #determine end position of data chunk 
    chunk_end_pos=chunk_start_pos+core_num-1 
    if (chunk_end_pos>=length(myData)) { 
    chunk_end_pos=length(myData) 
    calc_flag=FALSE 
    } 
    message("Calculating elements ",chunk_start_pos," to ",chunk_end_pos) 
    #mclapply call on data chunk 
    #store data in res 
    res[[length(res)+1]]=mclapply(myData[chunk_start_pos:(chunk_start_pos+core_num-1)], 
           function(x) sd(x), 
           mc.preschedule=FALSE, 
           mc.cores=core_num) 
    #calculate new start position 
    chunk_start_pos=chunk_start_pos+core_num 
} 

#let's compare the results 
all.equal(compRes,unlist(res,recursive=FALSE)) 
#TRUE 
関連する問題