2017-12-28 15 views
1

私は、寄木細工の形式でやや大きな(約20 GB)パーティションデータセットを持っています。 pyarrowを使用して、データセットから特定のパーティションを読みたいと思います。私はpyarrow.parquet.ParquetDatasetでこれを達成できると思ったが、それはそうではないようだ。ここに、私が欲しいものを説明するための小さな例があります。pyarrowで区切られた寄木細工のデータセットから特定の区画を読み取る

ランダムデータセットを作成するには:私は、パーティション1のすべての値を読み、そしてpandas.read_parquetとパーティション2のための唯一の真たい

from collections import OrderedDict 
from itertools import product, chain 
from uuid import uuid4 
import os 
from glob import glob 

import numpy as np 
import pandas as pd 
import pyarrow as pa 
from pyarrow.parquet import ParquetWriter, ParquetDataset 


def get_partitions(basepath, partitions): 
    """Generate directory hierarchy for a paritioned dataset 

    data 
    ├── part1=foo 
    │ └── part2=True 
    ├── part1=foo 
    │ └── part2=False 
    ├── part1=bar 
    │ └── part2=True 
    └── part1=bar 
     └── part2=False 

    """ 
    path_tmpl = '/'.join(['{}={}'] * len(partitions)) # part=value 
    path_tmpl = '{}/{}'.format(basepath, path_tmpl) # part1=val/part2=val 

    parts = [product([part], vals) for part, vals in partitions.items()] 
    parts = [i for i in product(*parts)] 
    return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts] 


partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False]) 
parts = get_partitions('data', partitions) 
for part in parts: 
    # 3 columns, 5 rows 
    data = [pa.array(np.random.rand(5)) for i in range(3)] 
    table = pa.Table.from_arrays(data, ['a', 'b', 'c']) 
    os.makedirs(part, exist_ok=True) 
    out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()), 
         table.schema, flavor='spark') 
    out.write_table(table) 
    out.close() 

を、それができない、私は常に列全体を読まなければなりません。私はpyarrowで次のことを試してみました:

動作しません
parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True]) 
parts2 = get_partitions('data', parts2) 
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2] 
files = [i for i in chain.from_iterable(files)] 
df2 = ParquetDataset(files).read().to_pandas() 

いずれか:あなたは以下を参照することができたよう

def get_spark_session_ctx(appName): 
    """Get or create a Spark Session, and the underlying Context.""" 
    from pyspark.sql import SparkSession 
    spark = SparkSession.builder.appName(appName).getOrCreate() 
    sc = spark.sparkContext 
    return (spark, sc) 


spark, sc = get_spark_session_ctx('test') 
spark_df = spark.read.option('basePath', 'data').parquet(*parts2) 
df3 = spark_df.toPandas() 

>>> df2.columns 
Index(['a', 'b', 'c'], dtype='object') 

を私はこのようなpysparkで簡単に行うことができます:

>>> df3.columns 
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object') 

pyarrowまたはpandasでこれを行うことができますか、カスタム実装が必要ですか?

更新: Wesの要請により、今はJIRAです。

答えて

1

質問:分割された寄木細工のデータセットから特定のパーティションをpyarrowで読み取るにはどうすればよいですか?

回答:今はできません。

https://issues.apache.org/jiraにこの機能を要求するApache Arrow JIRAを作成できますか?

これはpyarrow APIでサポートできるはずのものですが、誰かに実装する必要があります。ありがとうございます

+0

私はそれを、ありがとう。私は赤と書き込みの両方の機能要求を作成する必要があります(私がそれを逃していない限り)。私は最近多くの自由時間を持っています。もし誰かが私を導くならば、私はその実装について作業することもできます。 – suvayu

関連する問題