2017-05-01 2 views
2

サイズ10,000×800,000の行列Xについて、行列積Y = XX^Tを計算しようとしています。行列Xはディスク上のh5pyファイルに格納されます。結果として得られるYは、同じh5pyファイルに格納された10,000 * 10,000の行列でなければなりません。ここに再現可能なサンプルコードがあります。コア外マトリックスの行列乗算スケジューリングを一掃する

import dask.array as da 
from blaze import into 

into("h5py:///tmp/dummy::/X", da.ones((10**4, 8*10**5), chunks=(10**4,10**4))) 
x = into(da.Array, "h5py:///tmp/dummy::/X", chunks=(10**4,10**4))) 
y = x.dot(x.T) 
into("h5py:///tmp/dummy::/Y", y) 

私はそれぞれ(10,000 * 10,000)チャンクは個別に、転置内積が続いた後、最終的な結果にまとめるべきであるとして、この計算はスムーズに行くことを期待していました。しかし、この計算を実行すると、最終的にプロセスが終了するまで私のRAMとスワップメモリ​​の両方が満たされます。ここで

は、計算グラフのサンプルがdot_graphでプロットである:私は期待//dask.pydata.org/en/latest/scheduling-policy.html ます:http shedulingドキュメントによるとComputation graph sample

上テンドルード中間結果は、個々に計算されるとすぐに最終的な結果に1つずつ合計される。これにより、これらテンポドット中間結果のメモリが解放されるため、メモリエラーに直面することはありません。小さなおもちゃの例で遊ん

:黄色と紫のバーがget_arrayとtensordotそれぞれ表している緑色のバーは、和演算を表し Ressource profiler

from dask.diagnostics import Profiler, CacheProfiler, ResourceProfiler 

# Experiment on a (1,0000 * 5,000) matrix X split into 500 chunks of size (1,000 * 10) 
x = into(da.Array, "h5py:///tmp/dummy::/X", chunks=(10**3,10)))[:10**3,5000] 
y = x.T.dot(x) 
with Profiler() as prof, CacheProfiler() as cprof, ResourceProfiler() as rprof: 
    into("h5py:///tmp/dummy::/X", y) 
rprof.visualize() 

私は次のような表示を得ますオペレーション。これは、和演算が、すべての中間テンソル演算が合計する前に実行されるのを待つことを示しているようです。これはまた私のプロセスがメモリ不足になって死に至ったことを説明します。

だから私の質問は以下のとおりです。

  • は、和演算の正常な動作ですか?
  • 中間のテンソル積が計算されてメモリに保存される前に、中間合計を計算する方法がありますか?
  • そうでない場合は、ディスクへのこぼれには関係しない回避策がありますか?

大変助かりました!

答えて

0

一般的に言えば、小さな空間での稠密なマトリックス - マトリックス乗算の実行は難しいです。これは、すべての中間チャンクがいくつかの出力チャンクで使用されるためです。

HTTP shedulingドキュメントによると://dask.pydata.org/en/latest/scheduling-policy.html私は、上tensordot仲介結果は最後の和の中に一つ一つをまとめることを期待します個別に計算されるとすぐに結果が返されます。

あなたが示したグラフには、sum関数への入力が多数あります。 Daskは、sum関数を実行する前にこれらの入力がすべて完了するまで待機します。タスクスケジューラはsumが連想的であり、ピース単位で実行できるということを知らない。この意味情報の欠如は、専用の線形代数ライブラリではなく、Daskのような一般的なタスクスケジューリングシステムを使用して支払う代金です。あなたの目標が高密度線形代数を可能な限り効率的に実行することであれば、他の場所を見たいかもしれません。これはよく覆われたフィールドです。だから、

書かれたあなたのメモリ要件は、(それがはほとんど行う必要があります)を正確に正しい順序でそのDASKが進むと仮定すると、少なくとも8e5 * 1e4 * dtype.itemsizeあるとして。

あなたは次のことを試してみてください:

  1. (0.14.2が5月5日、2017年にリリースされなければならない、後で0.14.1よりDASKのバージョンを使用して非契約次元
  2. に沿ってチャンクを削減します)ここで、これらの大きな合計呼び出しをグラフ内の明示的に多数の小さな呼び出しに分割します。
  3. ディスクへのデータ書き込みをより効率的に処理する分散スケジューラを使用します。

    from dask.distributed import Client 
    client = Client(processes=False) # create a local cluster in this process 
    
+0

どうもありがとう@MRocklin! – Grin

+0

オプション3については、https://github.com/dask/distributed/issues/927に記載されている問題が発生します。 Hdf5シリアライズされた配列は、分散スケジューラーでうまく機能していないようです。それを修正すると、分散スケジューラのライブ診断/モニタリングツールを使用することができ、このような場合には非常に便利です。また、pydata/pythonエコシステムでは、特殊なooc線形代数ライブラリについて知っていますか?私はもともとdumpを使用し始めました。なぜなら、Numpy APIと組み合わされたコア線形代数機能がないからです。 – Grin