2017-09-22 14 views
2

同じスキーマを持つ複数の寄せ木ファイルを単一のデータフレームに読み込むには、daskを使用する必要があります。これは、すべて同じディレクトリにあるときに機能しますが、別々のディレクトリにあるときには機能しません。例えばdask/fastparquetを使用して複数のディレクトリから複数のparquetファイル(同じスキーマ)を読み取る方法

import fastparquet 
pfile = fastparquet.ParquetFile(['data/data1.parq', 'data/data2.parq']) 

うまく動作しますが、私は別のディレクトリにdata2.parqをコピーする場合は、次のように動作しません:

pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq']) 

私が手にトレースバックは以下の通りです:

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 
<ipython-input-11-b3d381f14edc> in <module>() 
----> 1 pfile = fastparquet.ParquetFile(['data/data1.parq', 'data2/data2.parq']) 

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, sep) 
    82   if isinstance(fn, (tuple, list)): 
    83    basepath, fmd = metadata_from_many(fn, verify_schema=verify, 
---> 84            open_with=open_with) 
    85    self.fn = sep.join([basepath, '_metadata']) # effective file 
    86    self.fmd = fmd 

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with) 
    164  else: 
    165   raise ValueError("Merge requires all PaquetFile instances or none") 
--> 166  basepath, file_list = analyse_paths(file_list, sep) 
    167 
    168  if verify_schema: 

~/anaconda/envs/hv/lib/python3.6/site-packages/fastparquet/util.py in analyse_paths(file_list, sep) 
    221  if len({tuple([p.split('=')[0] for p in parts[l:-1]]) 
    222    for parts in path_parts_list}) > 1: 
--> 223   raise ValueError('Partitioning directories do not agree') 
    224  for path_parts in path_parts_list: 
    225   for path_part in path_parts[l:-1]: 

ValueError: Partitioning directories do not agree 

dask.dataframe.read_parquetを使用すると同じエラーが発生します。私は同じParquetFileオブジェクトを使用すると仮定します。

異なるディレクトリから複数のファイルを読み込むにはどうすればよいですか?ロードする必要があるすべてのファイルを同じディレクトリに置くことはオプションではありません。

答えて

3

これは、絶対パスまたは明示的な相対パスのいずれかを使用している場合、マスター上fastparquetで作業を行います。

2

回避策は、各チャンクを別々に読み取り、dask.dataframe.from_delayedに渡すことです。これはread_parquet(正確には'index'がインデックスである必要があります)と同じメタデータ処理を行いませんが、そうでなければ動作するはずです。大手./ため

pfile = fastparquet.ParquetFile(['./data/data1.parq', './data2/data2.parq']) 

必要はバグと考えるべきである - 問題を参照してください。

import dask.dataframe as dd  
from dask import delayed  
from fastparquet import ParquetFile 

@delayed 
def load_chunk(pth): 
    return ParquetFile(pth).to_pandas() 

files = ['temp/part.0.parquet', 'temp2/part.1.parquet'] 
df = dd.from_delayed([load_chunk(f) for f in files]) 

df.compute() 
Out[38]: 
    index a 
0  0 1 
1  1 2 
0  2 3 
1  3 4 
+0

github issue - https://github.com/dask/fastparquet/issues/217 – chrisb

関連する問題