2017-06-23 10 views
1

this答えによれば、データフレームのインデックスがソートされていることをDaskが認識している場合、Daskデータフレームはスマートインデックスを実行できます。インデックスがソートされていることをDaskに知らせる方法は?

インデックスがソートされているかどうかをDaskに知らせるにはどうすればよいですか?

私はこのような何かやっている私の特定の状況では

:私はDASKがないことを前提としてい

dd = dask.dataframe.read_hdf(some_glob, '/data') 
print(dd.loc['2001-1-1':'2001-1-2']) 

for source is sources: 
    # This df has a datetimeindex that I know to be sorted 
    pd = load_pandas_df_from_some_source(source) 
    dd = dask.dataframe.from_pandas(pd, chunksize=foo) 
    dd.to_hdf(some_unique_filename, '/data') 

私はこのような何かをするときただし、インデックスが非常に遅いですが私のデータフレームがソートされていることを知っている。どうすればそれを知らせることができますか?

+0

「Apache Parquet」を優先しますか?つまり、HDF5は、1つのファイルとして保存する必要があるという点で限定されていますが、Parquetファイルは配布することができます。起こっていることは、DaskがHDF5ファイル全体を再読み込みしてDataFrameに変換する必要があることです。これは '.loc'ではなく長い時間がかかります。それが事実かもしれないかどうか観察するために2つのステップを分けましたか? – kuanb

答えて

2

HDFからロードすると、各パーティション内のインデックスのデータ値が必ずしもわかっているとは限りません。これらは、ルックアップを加速するために使用されるdaataframeの属性divisionsを構築するために使用されます。

あなたのようなデータセットについては、sorted_index=Trueを渡して、必要な動作を得ることができるはずです。

@kuanbが示唆しているように、表形式のデータ専用に設計された寄木張りの形式で保存してみてください。より多くのパフォーマンスを提供するかどうかは、データの性質(hdfは主に数値データ用に書かれたもの)とユースケースymmvに依存します。ただし、パーケットは一般に、各パーティションのデータ値のメタデータ統計を保持するのに適しています。

0

@murantが示唆しているように、sorted_index= keyword to the read_hdf functionを使用するのが理想的です。

さらに一般的には、set_index functionを使用して、他の方法で作成されたものであっても、任意のデータフレームでインデックスを設定できます。この関数には、新しい索引列がすでにソートされている場合、およびすでにパーティション間の分離値が分かっている場合に効率的な新しいキーワードがあります。ここに現在のdocstringがあります。最後の例は、あなたにとって興味深いかもしれません。

"""Set the DataFrame index (row labels) using an existing column 

This realigns the dataset to be sorted by a new column. This can have a 
significant impact on performance, because joins, groupbys, lookups, etc. 
are all much faster on that column. However, this performance increase 
comes with a cost, sorting a parallel dataset requires expensive shuffles. 
Often we ``set_index`` once directly after data ingest and filtering and 
then perform many cheap computations off of the sorted dataset. 

This function operates exactly like ``pandas.set_index`` except with 
different performance costs (it is much more expensive). Under normal 
operation this function does an initial pass over the index column to 
compute approximate qunatiles to serve as future divisions. It then passes 
over the data a second time, splitting up each input partition into several 
pieces and sharing those pieces to all of the output partitions now in 
sorted order. 

In some cases we can alleviate those costs, for example if your dataset is 
sorted already then we can avoid making many small pieces or if you know 
good values to split the new index column then we can avoid the initial 
pass over the data. For example if your new index is a datetime index and 
your data is already sorted by day then this entire operation can be done 
for free. You can control these options with the following parameters. 

Parameters 
---------- 
df: Dask DataFrame 
index: string or Dask Series 
npartitions: int, None, or 'auto' 
    The ideal number of output partitions. If None use the same as 
    the input. If 'auto' then decide by memory use. 
shuffle: string, optional 
    Either ``'disk'`` for single-node operation or ``'tasks'`` for 
    distributed operation. Will be inferred by your current scheduler. 
sorted: bool, optional 
    If the index column is already sorted in increasing order. 
    Defaults to False 
divisions: list, optional 
    Known values on which to separate index values of the partitions. 
    See http://dask.pydata.org/en/latest/dataframe-design.html#partitions 
    Defaults to computing this with a single pass over the data. Note 
    that if ``sorted=True``, specified divisions are assumed to match 
    the existing partitions in the data. If this is untrue, you should 
    leave divisions empty and call ``repartition`` after ``set_index``. 
compute: bool 
    Whether or not to trigger an immediate computation. Defaults to False. 

Examples 
-------- 
>>> df2 = df.set_index('x') # doctest: +SKIP 
>>> df2 = df.set_index(d.x) # doctest: +SKIP 
>>> df2 = df.set_index(d.timestamp, sorted=True) # doctest: +SKIP 

A common case is when we have a datetime column that we know to be 
sorted and is cleanly divided by day. We can set this index for free 
by specifying both that the column is pre-sorted and the particular 
divisions along which is is separated 

>>> import pandas as pd 
>>> divisions = pd.date_range('2000', '2010', freq='1D') 
>>> df2 = df.set_index('timestamp', sorted=True, divisions=divisions) # doctest: +SKIP 
    """ 
関連する問題