497パンダのデータフレームが.parquetファイルとして格納されたフォルダがあります。フォルダの合計サイズは7.6GBです。Dask.delayedはクラス内の.compute()クラスではありません
私は単純な取引システムを開発しようとしています。したがって、私は2つの異なるクラスを作成します。主なものはPortfolioクラスです。このクラスは、データフォルダ内のすべての単一データフレームに対してAssetオブジェクトを作成します。
import os
import pandas as pd
from dask.delayed import delayed
class Asset(file):
def __init__:
self.data_path = 'path\\to\\data\\folder\\'
self.data = pd.read_parquet(self.data_path + file, engine='auto')
class Portfolio:
def __init__:
self.data_path = 'path\\to\\data\\folder\\'
self.files_list = [file for file in os.listdir(self.data_path) if file.endswith('.parquet')]
self.assets_list = []
self.results = None
self.shared_data = '???'
def assets_loading(self):
for file in self.files_list:
tmp = Asset(file)
self.assets_list.append(tmp)
def dask_delayed(self):
for asset in self.assets_list:
backtest = delayed(self.model)(asset)
def dask_compute(self):
self.results = delayed(dask_delayed)
self.results.compute()
def model(self, asset):
# do shet
if __name__ == '__main__':
portfolio = Portfolio()
portfolio.dask_compute()
結果が処理されていないように見えます。私はチェックしようとした場合、コンソール版画portfolio.results:
Out[5]: Delayed('NoneType-7512ffcc-3b10-445f-928a-f01c01bae29c')
だからここは私の質問は以下のとおりです。
- あなたは間違っているものを私に説明できますか?
- assets_loading()関数を実行すると、基本的に高速の処理速度のためにデータフォルダ全体がメモリにロードされますが、RAM(16GB)が飽和します。私は7.6GBのフォルダが16GBのRAMを飽和させるとは思わなかったので、Daskを使いたいのです。私のスクリプトワークフローと互換性のあるソリューションはありますか?
- もう1つ問題があり、おそらく大きな問題があります。 Daskでは、同時に複数のアセットに渡ってモデル関数を並列化しようとしていますが、各Daskプロセス内にある変数の値をPortfolioオブジェクトに保存するために共有メモリ(スクリプト内のself.shared_data)が必要ですたとえば、単一資産の年間パフォーマンス)。 Dask遅延プロセス間でデータを共有する方法と、このデータをPortfolioの変数に格納する方法を教えてください。
ありがとうたくさん。