2017-01-15 5 views
2

私は大規模なCSVやたくさんのたくさんの小さなXMLファイルを含む大きなファイルのファイルに対してたくさんのテキスト処理を行ってきました。時々私は集計を行っていますが、タグ付けされているか既に構造化されているものを超えて、これらのファイルにあるものをより深く見るためにNLP型の作業をしています。CSVの日付解析でスローダスクのパフォーマンスが発生しましたか?

私は、複数のCPUでこれらの計算を実行するためにマルチプロセッシングライブラリを多く使用してきましたが、私はDaskの背後にあるアイデアに惚れてしまいました。ネットと共同作業者の両方で強く推奨されます。

私はここDASKのパフォーマンスに関する同様の質問質問:私は小さなファイルのロードたくさんの可能性が高いパフォーマンスをゴミ箱に移動することを知らせて

Slow Performance with Python Dask bag?

とMRocklin(https://stackoverflow.com/users/616616/mrocklinを)。

しかし、私はそれを単一の大きなファイル(200MB)で実行しても、まだそれをうまく実行することはできません。ここに例があります:

私は900,000行のつぶやきのCSVファイルを持っています。私はすぐに読み込んで "created_at"フィールドを解析したいと思います。ここで私がそれをした3つの方法とそれぞれのベンチマークがあります。私は16GBのRAMを搭載した新しいi7 2016 MacBook Proでこれを実行しました。

import pandas 
import dask.dataframe as dd 
import multiprocessing 

%%time 
# Single Threaded, no chunking 
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"]) 
print(len(d)) 

CPU時間:ユーザーの2分の31S、SYS:807ミリ秒、全2分の32S 壁時間:2分32S

%%time 
# Multithreaded chunking 
def parse_frame_dates(frame): 
    frame["created_at"] = pandas.to_datetime(frame["created_at"]) 
    return(frame) 

d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000) 
frames = multiprocessing.Pool().imap_unordered(get_count, d) 
td = pandas.concat(frames) 
print(len(td)) 

CPU時間:ユーザー5.65秒、SYS:1.47秒、全7.12秒 ウォール時間:1分10秒の

%%time 
# Dask Load 
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
       parse_dates = ["created_at"], blocksize = 10000000).compute() 

CPU時間:ユーザーの2分の59S、SYS:26.2秒、合計:3分25秒 ウォール時間:3分12S

多くの異なるDask比較でこれらの種類の結果が見つかりましたが、これを正しく動作させることで正しい方向に向けるかもしれません。

要するに、これらの種類のタスクのためにDaskからどのように最高のパフォーマンスを引き出すことができますか?シングルスレッドとマルチスレッドの両方で、他の方法でパフォーマンスが低下するのはなぜですか?

答えて

2

私は、Pandasのread_csv日時解析コードが純粋なPythonであると考えています。したがって、dask.dataframeがデフォルトで使用するものであるスレッドの使用にはあまり効果がありません。

プロセスを使用すると、パフォーマンスが向上する場合があります。

私は次は高速に動作することを疑う:

import dask.multiprocessing 
dask.set_options(get=dask.multiprocessing.get) # set processes as default 

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
       parse_dates = ["created_at"], blocksize = 10000000) 
len(d) 

プロセスに問題がプロセス間通信が高価になることができるということです。私はワーカープロセスのすべてのパンダデータフレームをピックアップしてメインの呼び出しプロセスに移動させないために、d.compute()ではなく、len(d)を明示的に計算しています。実のところ、これはかなり一般的です。なぜなら、人々はまれに完全なデータフレームを望んでいるのですが、データフレーム上ではいくらか計算する必要があるからです。

ここでは関係docpageあなたはまた、単一のマシンにdistributed schedulerを使用するのではなくマルチプロセッシング・スケジューラを使用する場合がありますhttp://dask.readthedocs.io/en/latest/scheduler-choice.html

です。これは上で参照したドキュメントにも記述されています。

$ pip install dask distributed 

from dask.distributed import Client 
c = Client() # create processes and set as default 

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
       parse_dates = ["created_at"], blocksize = 10000000) 
len(d) 
+1

タイミングの方法、OPは同じではありません。 '' parse_dates = ... ''を渡すことはかなり堅牢な方法ですが、私はPythonの構文解析を遅くする必要があります。ほとんどの場合、単純にcsvを読み込み、その後、.to_datetimeを使って後処理する必要があります。特に、日付が何であるかに応じてformat =引数やその他のオプションを使用する必要があります。 、YMMV。実際、この方法は特にFYIに優しいです(これらは個別タスクですが、順次タスクです)。 – Jeff

+1

ありがとう、マシュー!それは大きな違いをもたらしました。私の例では、マルチスレッドになると、シングルスレッドの場合は2分、Daskの場合は30秒になりました。私が期待していたラインのほうがはるかに多くなっています。私は、スケジューラの選択と分散スケジューラの使用についての情報を調べます。私は明らかに私の宿題のすべてをやっていなかった。再度、感謝します! –

関連する問題