2017-09-27 8 views
0

daskfbprophetライブラリを一緒に使用しようとしていますが、何か間違っているか、予期しないパフォーマンスの問題があります。Daskとfbprophet

import dask.dataframe as dd 
import datetime as dt 
import multiprocessing as mp 
import numpy as np 
import pandas as pd 
pd.options.mode.chained_assignment = None 
from fbprophet import Prophet 
import time 
ncpu = mp.cpu_count() 

def parallel_pd(fun, vec, pool = ncpu-1): 
    with mp.Pool(pool) as p: 
     res = p.map(fun,vec) 
    return(res) 

def forecast1dd(ts): 
    time.sleep(0.1) 
    return ts["y"].max() 

def forecast1mp(key): 
    ts = df[df["key"]==key] 
    time.sleep(0.1) 
    return ts["y"].max() 

def forecast2dd(ts): 
    future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1), 
                periods=7, freq="D")}) 
    key = ts.name 
    model = Prophet(yearly_seasonality=True) 
    model.fit(ts) 
    forecast = model.predict(future) 
    future["yhat"] = forecast["yhat"] 
    future["key"] = key 
    return future.as_matrix() 

def forecast2mp(key): 
    ts = df[df["key"]==key] 
    future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1), 
                periods=7, freq="D")}) 
    model = Prophet(yearly_seasonality=True) 
    model.fit(ts) 
    forecast = model.predict(future) 
    future["yhat"] = forecast["yhat"] 
    future["key"] = key 
    return future.as_matrix() 

は片側で、私は私の機能をシミュレートし、次のデータフレームのために、約0.1秒そうforecast1ddforecast1mpで実行されるカスタム機能を有する

N = 2*365 
key_n = 5000 
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"), 
        "y":np.random.normal(100,20,N), 
        "key":np.repeat(str(k),N)}) for k in range(key_n)]) 
keys = df.key.unique() 
df = df.sample(frac=1).reset_index(drop=True) 
ddf = dd.from_pandas(df, npartitions=ncpu*2) 

Iは、(それぞれ)を得

%%time 
grp = ddf.groupby("key").apply(forecast1dd, meta=pd.Series(name="s")) 
df1dd = grp.to_frame().compute() 
CPU times: user 7.7 s, sys: 400 ms, total: 8.1 s 
Wall time: 1min 8s 

%%time 
res = parallel_pd(forecast1mp,keys) 
CPU times: user 820 ms, sys: 360 ms, total: 1.18 s 
Wall time: 10min 36s 

最初のケースでは、コアは100%で使用されていませんが、パフォーマンスは私の実際の状況に沿っています。ラインプロファイラを使用して、2番目のケースでのパフォーマンス低下の原因がts = df[df["key"]==key]であることを簡単に確認することができます。さらにキーがあると最悪になります。

今まで私はdaskに満足しています。しかし、いつでも私が使用しようとするとfbprophetものが変わります。ここで私はより少ないkeysを使用しますが、前のケースdaskのパフォーマンスは常にmultiprocessingよりも悪いとは考えにくいでしょう。

N = 2*365 
key_n = 200 
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"), 
        "y":np.random.normal(100,20,N), 
        "key":np.repeat(str(k),N)}) for k in range(key_n)]) 

keys = df.key.unique() 
df = df.sample(frac=1).reset_index(drop=True) 
ddf = dd.from_pandas(df, npartitions=ncpu*2) 

%%time 
grp = ddf.groupby("key").apply(forecast2dd, 
meta=pd.Series(name="s")).to_frame().compute() 
df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values]) 
CPU times: user 3min 42s, sys: 15 s, total: 3min 57s 
Wall time: 3min 30s 

%%time 
res = parallel_pd(forecast2mp,keys) 
df2mp = pd.concat([pd.DataFrame(a) for a in res]) 
CPU times: user 76 ms, sys: 160 ms, total: 236 ms 
Wall time: 39.4 s 

は今私の質問は以下のとおりです。

  • どのように私はDASKと預言者のパフォーマンスを向上させることができますか?
  • コアを100%使用してマスクを作成するにはどうすればよいですか?

答えて

1

私はProphetがGILを保持していると思われますので、ddf.groupby("key").apply(forecast2dd, meta=pd.Series(name="s")を計算すると、ただ1つのスレッドだけが一度にPythonコードを実行できます。 multiprocessingを使用すると、データをコピーすることを犠牲にして、これを回避できますncpu回。これはあなたのparallel_pd関数と同じランタイムを持つ必要があります。

%%time 
with dask.set_options(get=dask.multiprocessing.get): 
    grp = ddf.groupby("key").apply(forecast2dd, 
     meta=pd.Series(name="s")).to_frame().compute() 

df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values]) 

CPU times: user 2.47 s, sys: 251 ms, total: 2.72 s 
Wall time: 1min 27s 

GILを保持する必要がある場合、預言者の開発者に尋ねることができます。私はこの問題がPyStanであり、実際のStanソルバが実行されているときにGILが必要ないと思われます。 here


サイドノートではGithubの問題にあります:これは実際に適合していませんが

%%time 

def forcast1dd_chunk(ts): 
    time.sleep(0.1) 
    return ts.max() 

def forecast1dd_agg(ts): 
    return ts.max() 

f1dd = dd.Aggregation("forecast1dd", forcast1dd_chunk, forecast1dd_agg) 

grp = ddf.groupby("key")[['y']].agg(f1dd) 
x = grp.compute() 

CPU times: user 59.5 ms, sys: 5.13 ms, total: 64.7 ms 
Wall time: 355 ms 

:あなたのサンプルforecast1ddが集約であるため、それははるかに迅速dd.Aggregationを使用して実行することができます問題であり、集約ではありません。

+0

こんにちはTom、私はあなたの方法を試してみました。また、 'dask.distributed import Client'から、' client = Client() 'とパフォーマンスはほとんど同じです。問題は 'predict2dd'で' key_n = 5000'を使用するたびに次のエラーが発生することです: 'OSError:[Errno 24]開いているファイルが多すぎます: '/ dev/null' – user32185

+0

OSError:[Errno 24 ]端末から '' ulimit -Sn 10000'を起動するファイルが多すぎます。 – user32185

関連する問題