2017-06-28 1 views
2

私はDaskでSparkのunpersistと同等のものを見つけようとしています。それは目的を前処理するための多くの集計を計算する を必要とするため、呼び出し元のコンテキストがすでに例えば、大df続いているDaskでデータの重複がないインクリメンタルキャッシングを実現する方法は?

  • :明示的なunpersistのための私の必要性 状況で発生します。
  • 呼び出しコンテキストは、反復アルゴリズムを実行するため、persistを呼び出す必要がある関数、たとえば を呼び出します。

基本的な例は次のようになります。スパークで

def iterative_algorithm(df, num_iterations): 

    for iteration in range(num_iterations): 

     # Transformation logic requiring e.g. map_partitions 
     def mapper(df): 
      # ... 
      return df 

     df = df.map_partitions(mapper) 
     df = df.persist() 
     # Now I would like to explicitly unpersist the old snapshot 

    return df 

、問題は明示的に古いスナップショット を解放することによって解決することができます。明らかにDaskは明示的なunpersistを持っていませんが、基礎となる先物の参照カウントを介して という問題を処理します。これは、呼び出し元のコンテキストが古いフューチャーに対して参照 を保持し、サブファンクションが変更された への参照を保持しているため、上記の例 がデータを複製することを意味します。私の実際のユースケースでは、このような変換呼び出しにいくつかのネストされたレベルがあり、データが複数回も重複してしまいます。

追加コピーを行わずに反復キャッシュを解決する方法はありますか?

答えて

2

私はこれを解決する方法をいくつか掲載しますが、私はまだより良いものを探しています 代替案。

参照カウントのため、コピーを避けるのは難しいですが、 があります。この問題は、元のdfに参照番号 を保持している呼び出し元が、df = df.<method> 呼び出しによって新しいインスタンスを作成した結果になります。この問題を解決するには、df自体を に変更する必要があります。残念ながら、Pythonでは一般に関数の引数である の参照を変更することはできません。

解決策1:仕事の周りにするために、その制限ナイーブ変更可能な参照

最も簡単な方法は、リスト や辞書にdfをラップすることです。この場合、サブ機能は外部参照を変更することができる。 :

df_list[0] = df_list[0].map_partitions(mapper) 
df_list[0] = df_list[0].persist() 

しかし、これは構文的に厄介であるとdf = df_list[0]を経由して構文を簡素化 が再びデータの重複を引き起こす可能性が基礎となる先物を するための新しい参照を作成しますので、一つは、非常に慎重にする必要があります。

解決策2:ラッパーベースの可変リファレンス

一つのデータフレームを参照 を保持し、小さなラッパークラスを書くことができ、それに向上させることができます。このラッパーを通過すると、サブ関数は の参照を突然変異させることができます。構文上の問題を改善するために、ラッパー が機能をデータフレームに自動的に委譲するか、またはそれから を継承するかどうかを検討することができます。全体的にこの解決法も正しいとは思わない。

解決策3:私は現在、次の バリアントを好む他のソリューションのシンタックスの問題を回避するための明示的な変異

、効果的に元dfのインプレース修飾を介してmap_partitionspersistの可変バージョンをシミュレートインスタンス。

def modify_inplace(old_df, new_df): 
    # Currently requires accessing private fields of a DataFrame, but 
    # maybe this could be officially supported by Dask. 
    old_df.dask = new_df.dask 
    old_df._meta = new_df._meta 
    old_df._name = new_df._name 
    old_df.divisions = new_df.divisions 


def iterative_algorithm(df, num_iterations): 

    for iteration in range(num_iterations): 

     def mapper(df): 
      # Actual transform logic... 
      return df 

     # Simulate mutable/in-place map_partitions 
     new_df = df.map_partitions(mapper) 
     modify_inplace(df, new_df) 

     # Simulate mutable/in-place persist 
     new_df = df.persist() 
     modify_inplace(df, new_df) 

    # Technically no need to return, because all operations were in-place 
    return df 

これは私のために合理的にうまく動作しますが、慎重にこれらの規則に従うことが必要です。

  • は、上記のパターンによってdf = df.<method>などのすべての不変の呼び出しを交換してください。
  • dfへの参照を作成することに注意してください。たとえば、構文上の便宜のためにsome_col = df["some_sol"]のような変数を使用するには、persistを呼び出す前にdel some_colが必要です。さもなければ、some_colで保存されたリファレンスは再びデータの複製を引き起こします。
+1

あなたの 'modify_inplace'関数に' df.divisions'も含めたいと思うでしょう – MRocklin

2

次のようにして、リリース関数を書くことができます:

from distributed.client import futures_of 

def release(collection): 
    for future in futures_of(collection): 
     future.release() 

これは、現在のインスタンスを解放します。あなたはあなたの周りに横たわっこれらの先物の複数のインスタンスを持っている場合は、数回それを呼び出すか、次のようなループを追加する必要があります:

while future.client.refcount[future.key] > 0: 

しかし、一般的に、この複数回を呼び出すと、あなたの周りに浮い他のコピーを持っている場合には賢明と思われます理由付きで

関連する問題