dask
とfbprophet
ライブラリを一緒に使用しようとしていますが、何か間違っているか、予期しないパフォーマンスの問題があります。Daskとfbprophet
import dask.dataframe as dd
import datetime as dt
import multiprocessing as mp
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None
from fbprophet import Prophet
import time
ncpu = mp.cpu_count()
def parallel_pd(fun, vec, pool = ncpu-1):
with mp.Pool(pool) as p:
res = p.map(fun,vec)
return(res)
def forecast1dd(ts):
time.sleep(0.1)
return ts["y"].max()
def forecast1mp(key):
ts = df[df["key"]==key]
time.sleep(0.1)
return ts["y"].max()
def forecast2dd(ts):
future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
periods=7, freq="D")})
key = ts.name
model = Prophet(yearly_seasonality=True)
model.fit(ts)
forecast = model.predict(future)
future["yhat"] = forecast["yhat"]
future["key"] = key
return future.as_matrix()
def forecast2mp(key):
ts = df[df["key"]==key]
future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
periods=7, freq="D")})
model = Prophet(yearly_seasonality=True)
model.fit(ts)
forecast = model.predict(future)
future["yhat"] = forecast["yhat"]
future["key"] = key
return future.as_matrix()
は片側で、私は私の機能をシミュレートし、次のデータフレームのために、約0.1秒そうforecast1dd
とforecast1mp
で実行されるカスタム機能を有する
N = 2*365
key_n = 5000
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
"y":np.random.normal(100,20,N),
"key":np.repeat(str(k),N)}) for k in range(key_n)])
keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)
Iは、(それぞれ)を得
%%time
grp = ddf.groupby("key").apply(forecast1dd, meta=pd.Series(name="s"))
df1dd = grp.to_frame().compute()
CPU times: user 7.7 s, sys: 400 ms, total: 8.1 s
Wall time: 1min 8s
%%time
res = parallel_pd(forecast1mp,keys)
CPU times: user 820 ms, sys: 360 ms, total: 1.18 s
Wall time: 10min 36s
最初のケースでは、コアは100%で使用されていませんが、パフォーマンスは私の実際の状況に沿っています。ラインプロファイラを使用して、2番目のケースでのパフォーマンス低下の原因がts = df[df["key"]==key]
であることを簡単に確認することができます。さらにキーがあると最悪になります。
今まで私はdask
に満足しています。しかし、いつでも私が使用しようとするとfbprophet
ものが変わります。ここで私はより少ないkeys
を使用しますが、前のケースdask
のパフォーマンスは常にmultiprocessing
よりも悪いとは考えにくいでしょう。
N = 2*365
key_n = 200
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
"y":np.random.normal(100,20,N),
"key":np.repeat(str(k),N)}) for k in range(key_n)])
keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)
%%time
grp = ddf.groupby("key").apply(forecast2dd,
meta=pd.Series(name="s")).to_frame().compute()
df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values])
CPU times: user 3min 42s, sys: 15 s, total: 3min 57s
Wall time: 3min 30s
%%time
res = parallel_pd(forecast2mp,keys)
df2mp = pd.concat([pd.DataFrame(a) for a in res])
CPU times: user 76 ms, sys: 160 ms, total: 236 ms
Wall time: 39.4 s
は今私の質問は以下のとおりです。
- どのように私はDASKと預言者のパフォーマンスを向上させることができますか?
- コアを100%使用してマスクを作成するにはどうすればよいですか?
こんにちはTom、私はあなたの方法を試してみました。また、 'dask.distributed import Client'から、' client = Client() 'とパフォーマンスはほとんど同じです。問題は 'predict2dd'で' key_n = 5000'を使用するたびに次のエラーが発生することです: 'OSError:[Errno 24]開いているファイルが多すぎます: '/ dev/null' – user32185
OSError:[Errno 24 ]端末から '' ulimit -Sn 10000'を起動するファイルが多すぎます。 – user32185