Skip to content

Commit

Permalink
Overwrite (#553)
Browse files Browse the repository at this point in the history
* Implementation of an 'overwrite' function.
This function is triggered with 'write' function and a specific combination of parameters.
row_group_offsets = 0
file_scheme = 'hive'
partition_on initialized
append = True

* Typo corrections.

* Correction of case row_group_offsets = [0]

* Fix not properly finalized.

* Overwriting of part.0 file is now not possible in case several files are already existing in the same partition folder.

* Removing a forgotten 'print'.

* Overwrite now requires append='overwrite'. Clarifying overwrite in docstring and quickstart.

* Improved description of the 'overwrite' feature.

* Forgotten improvement in description of the 'overwrite' feature.
  • Loading branch information
yohplala authored Mar 2, 2021
1 parent cff0851 commit e037540
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 20 deletions.
3 changes: 2 additions & 1 deletion docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ Further options that may be of interest are:

- the compression algorithms (typically "snappy", for fast, but not too space-efficient), which can vary by column
- the row-group splits to apply, which may lead to efficiencies on loading, if some row-groups can be skipped. Statistics (min/max) are calculated for each column in each row-group on the fly.
- multi-file saving can be enabled with the ``file_scheme`` keyword: hive-style output is a directory with a single metadata file and several data-files.
- multi-file saving can be enabled with the ``file_scheme="hive"|"drill"``: directory-tree-partitioned output with a single metadata file and several data-files, one or more per leaf directory. The values used for partitioning are encoded into the paths of the data files.
- append to existing data sets with ``append=True``, adding new row-groups. For the specific case of "hive"-partitioned data and one file per partition, ``append="overwrite"`` is also allowed, which replaces partitions of the data where new data are given, but leaves other existing partitions untouched.
- options has_nulls, fixed_text and write_index affect efficiency see the `api docs <./api.html#fastparquet.write>`_.

.. code-block:: python
Expand Down
61 changes: 61 additions & 0 deletions fastparquet/test/test_overwrite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
test_overwrite.py
Tests for overwriting parquet files.
"""

import pandas as pd
import pytest
from fastparquet import write, ParquetFile
from fastparquet.test.util import tempdir


def test_write_with_rgp_by_date_as_index(tempdir):

# Step 1 - Writing of a 1st df, with `row_group_offsets=0`,
# `file_scheme=hive` and `partition_on=['location', 'color`].
df1 = pd.DataFrame({'humidity': [0.3, 0.8, 0.9],
'pressure': [1e5, 1.1e5, 0.95e5],
'location': ['Paris', 'Paris', 'Milan'],
'color': ['red', 'black', 'blue']})
write(tempdir, df1, row_group_offsets=0, file_scheme='hive',
partition_on=['location', 'color'])

# Step 2 - Overwriting with a 2nd df having overlapping data, in
# 'overwrite' mode:
# `row_group_offsets=0`, `file_scheme=hive`,
# `partition_on=['location', 'color`] and `append=True`.
df2 = pd.DataFrame({'humidity': [0.5, 0.3, 0.4, 0.8, 1.1],
'pressure': [9e4, 1e5, 1.1e5, 1.1e5, 0.95e5],
'location': ['Milan', 'Paris', 'Paris', 'Paris', 'Paris'],
'color': ['red', 'black', 'black', 'green', 'green' ]})

write(tempdir, df2, row_group_offsets=0, file_scheme='hive', append='overwrite',
partition_on=['location', 'color'])

expected = pd.DataFrame({'humidity': [0.9, 0.5, 0.3, 0.4, 0.8, 1.1, 0.3],
'pressure': [9.5e4, 9e4, 1e5, 1.1e5, 1.1e5, 9.5e4, 1e5],
'location': ['Milan', 'Milan', 'Paris', 'Paris', 'Paris', 'Paris', 'Paris'],
'color': ['blue', 'red', 'black', 'black', 'green', 'green', 'red']})\
.astype({'location': 'category', 'color': 'category'})
recorded = ParquetFile(tempdir).to_pandas()
# df1 is 3 rows, df2 is 5 rows. Because of overlapping data with keys
# 'location' = 'Paris' & 'color' = 'black' (1 row in df2, 2 rows in df2)
# resulting df contains for this combination values of df2 and not that of
# df1. Total resulting number of rows is 7.
assert expected.equals(recorded)

def test_several_existing_parts_in_folder_exception(tempdir):

df1 = pd.DataFrame({'humidity': [0.3, 0.8, 0.9, 0.7],
'pressure': [1e5, 1.1e5, 0.95e5, 1e5],
'location': ['Paris', 'Paris', 'Milan', 'Paris'],
'exterior': ['yes', 'no', 'yes', 'yes']})

write(tempdir, df1, row_group_offsets = 1, file_scheme='hive',
write_index=False, partition_on=['location', 'exterior'])

with pytest.raises(ValueError, match="^Some partition folders"):
write(tempdir, df1, row_group_offsets = 0, file_scheme='hive',
write_index=False, partition_on=['location', 'exterior'],
append='overwrite')

86 changes: 67 additions & 19 deletions fastparquet/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import struct
import warnings
from bisect import bisect

import numba
import numpy as np
Expand Down Expand Up @@ -814,15 +815,15 @@ def write(filename, data, row_group_offsets=50000000,
mkdirs=default_mkdirs, has_nulls=True, write_index=None,
partition_on=[], fixed_text=None, append=False,
object_encoding='infer', times='int64'):
""" Write Pandas DataFrame to filename as Parquet Format
""" Write Pandas DataFrame to filename as Parquet Format.
Parameters
----------
filename: string
Parquet collection to write to, either a single file (if file_scheme
is simple) or a directory containing the metadata and data-files.
data: pandas dataframe
The table to write
The table to write.
row_group_offsets: int or list of ints
If int, row-groups will be approximately this many rows, rounded down
to make row groups about the same size; if a list, the explicit index
Expand Down Expand Up @@ -858,10 +859,10 @@ def write(filename, data, row_group_offsets=50000000,
the compressor.
If the dictionary contains a "_default" entry, this will be used for any
columns not explicitly specified in the dictionary.
file_scheme: 'simple'|'hive'
file_scheme: 'simple'|'hive'|'drill'
If simple: all goes in a single file
If hive: each row group is in a separate file, and a separate file
(called "_metadata") contains the metadata.
If hive or drill: each row group is in a separate file, and a separate
file (called "_metadata") contains the metadata.
open_with: function
When called with a f(path, mode), returns an open file-like object
mkdirs: function
Expand Down Expand Up @@ -889,10 +890,17 @@ def write(filename, data, row_group_offsets=50000000,
before writing, potentially providing a large speed
boost. The length applies to the binary representation *after*
conversion for utf8, json or bson.
append: bool (False)
If False, construct data-set from scratch; if True, add new row-group(s)
to existing data-set. In the latter case, the data-set must exist,
and the schema must match the input data.
append: bool (False) or 'overwrite'
- If False, construct data-set from scratch; if True, add new row-group(s)
to existing data-set. In the latter case, the data-set must exist,
and the schema must match the input data.
- If 'overwrite', existing partitions will be replaced in-place, where
the given data has any rows within a given partition. To enable this,
these other parameters have to be set to specific values, or will
raise ValueError:
* ``row_group_offsets=0``
* ``file_scheme='hive'``
* ``partition_on`` has to be used, set to at least a column name
object_encoding: str or {col: type}
For object columns, this gives the data type, so that the values can
be encoded to bytes. Possible values are bytes|utf8|json|bson|bool|int|int32|decimal,
Expand All @@ -913,10 +921,13 @@ def write(filename, data, row_group_offsets=50000000,
if str(has_nulls) == 'infer':
has_nulls = None
if isinstance(row_group_offsets, int):
l = len(data)
nparts = max((l - 1) // row_group_offsets + 1, 1)
chunksize = max(min((l - 1) // nparts + 1, l), 1)
row_group_offsets = list(range(0, l, chunksize))
if not row_group_offsets:
row_group_offsets = [0]
else:
l = len(data)
nparts = max((l - 1) // row_group_offsets + 1, 1)
chunksize = max(min((l - 1) // nparts + 1, l), 1)
row_group_offsets = list(range(0, l, chunksize))
if (write_index or write_index is None
and not isinstance(data.index, pd.RangeIndex)):
cols = set(data)
Expand All @@ -932,25 +943,38 @@ def write(filename, data, row_group_offsets=50000000,
ignore = partition_on if file_scheme != 'simple' else []
fmd = make_metadata(data, has_nulls=has_nulls, ignore_columns=ignore,
fixed_text=fixed_text, object_encoding=object_encoding,
times=times, index_cols=index_cols, partition_cols=partition_on)
times=times, index_cols=index_cols,
partition_cols=partition_on)

if file_scheme == 'simple':
write_simple(filename, data, fmd, row_group_offsets,
compression, open_with, has_nulls, append)
elif file_scheme in ['hive', 'drill']:
if append:
if append: # can be True or 'overwrite'
pf = api.ParquetFile(filename, open_with=open_with)
if pf.file_scheme not in ['hive', 'empty', 'flat']:
raise ValueError('Requested file scheme is %s, but '
'existing file scheme is not.' % file_scheme)
fmd = pf.fmd
i_offset = find_max_part(fmd.row_groups)
if tuple(partition_on) != tuple(pf.cats):
raise ValueError('When appending, partitioning columns must'
' match existing data')
if append == 'overwrite' and partition_on:
# Build list of 'path' from existing files
# (to have partition values).
exist_rgps = ['_'.join(rg.columns[0].file_path.split('/')[:-1])
for rg in fmd.row_groups]
if len(exist_rgps) > len(set(exist_rgps)):
# Some groups are in the same folder (partition). This case
# is not handled.
raise ValueError("Some partition folders contain several \
part files. This situation is not allowed with use of `append='overwrite'`.")
i_offset = 0
else:
i_offset = find_max_part(fmd.row_groups)
else:
i_offset = 0
fn = join_path(filename, '_metadata')

mkdirs(filename)
for i, start in enumerate(row_group_offsets):
end = (row_group_offsets[i+1] if i < (len(row_group_offsets) - 1)
Expand All @@ -962,17 +986,41 @@ def write(filename, data, row_group_offsets=50000000,
compression, open_with, mkdirs,
with_field=file_scheme == 'hive'
)
fmd.row_groups.extend(rgs)
if append != 'overwrite':
# Append or 'standard' write mode.
fmd.row_groups.extend(rgs)
else:
# 'overwrite' mode -> update fmd in place.
# Get 'new' combinations of values from columns listed in
# 'partition_on',along with corresponding row groups.
new_rgps = {'_'.join(rg.columns[0].file_path.split('/')[:-1]): rg \
for rg in rgs}
for part_val in new_rgps:
if part_val in exist_rgps:
# Replace existing row group metadata with new ones.
row_group_index = exist_rgps.index(part_val)
fmd.row_groups[row_group_index] = new_rgps[part_val]
else:
# Insert new rg metadata among existing ones,
# preserving order, if the existing list is sorted
# in the 1st place.
row_group_index = bisect(exist_rgps, part_val)
fmd.row_groups.insert(row_group_index, new_rgps[part_val])
# Keep 'exist_rgps' list representative for next 'replace'
# or 'insert' cases.
exist_rgps.insert(row_group_index, part_val)

else:
partname = join_path(filename, part)
with open_with(partname, 'wb') as f2:
rg = make_part_file(f2, data[start:end], fmd.schema,
compression=compression, fmd=fmd)
for chunk in rg.columns:
chunk.file_path = part

fmd.row_groups.append(rg)

fmd.num_rows = sum(rg.num_rows for rg in fmd.row_groups)
fn = join_path(filename, '_metadata')
write_common_metadata(fn, fmd, open_with, no_row_groups=False)
write_common_metadata(join_path(filename, '_common_metadata'), fmd,
open_with)
Expand Down

0 comments on commit e037540

Please sign in to comment.