私は、寄木細工の形式でやや大きな(約20 GB)パーティションデータセットを持っています。 pyarrow
を使用して、データセットから特定のパーティションを読みたいと思います。私はpyarrow.parquet.ParquetDataset
でこれを達成できると思ったが、それはそうではないようだ。ここに、私が欲しいものを説明するための小さな例があります。pyarrowで区切られた寄木細工のデータセットから特定の区画を読み取る
ランダムデータセットを作成するには:私は、パーティション1のすべての値を読み、そしてpandas.read_parquet
とパーティション2のための唯一の真たい
from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob
import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset
def get_partitions(basepath, partitions):
"""Generate directory hierarchy for a paritioned dataset
data
├── part1=foo
│ └── part2=True
├── part1=foo
│ └── part2=False
├── part1=bar
│ └── part2=True
└── part1=bar
└── part2=False
"""
path_tmpl = '/'.join(['{}={}'] * len(partitions)) # part=value
path_tmpl = '{}/{}'.format(basepath, path_tmpl) # part1=val/part2=val
parts = [product([part], vals) for part, vals in partitions.items()]
parts = [i for i in product(*parts)]
return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]
partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
# 3 columns, 5 rows
data = [pa.array(np.random.rand(5)) for i in range(3)]
table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
os.makedirs(part, exist_ok=True)
out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()),
table.schema, flavor='spark')
out.write_table(table)
out.close()
を、それができない、私は常に列全体を読まなければなりません。私はpyarrow
で次のことを試してみました:
parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()
いずれか:あなたは以下を参照することができたよう
def get_spark_session_ctx(appName):
"""Get or create a Spark Session, and the underlying Context."""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(appName).getOrCreate()
sc = spark.sparkContext
return (spark, sc)
spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()
:
>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')
を私はこのようなpyspark
で簡単に行うことができます:
>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')
pyarrow
またはpandas
でこれを行うことができますか、カスタム実装が必要ですか?
更新: Wesの要請により、今はJIRAです。
私はそれを、ありがとう。私は赤と書き込みの両方の機能要求を作成する必要があります(私がそれを逃していない限り)。私は最近多くの自由時間を持っています。もし誰かが私を導くならば、私はその実装について作業することもできます。 – suvayu