Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support non default indexes in writing to Parquet #18629

Merged
Merged
5 changes: 1 addition & 4 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4504,11 +4504,8 @@ dtypes, including extension dtypes such as datetime with tz.

Several caveats.

- The format will NOT write an ``Index``, or ``MultiIndex`` for the
``DataFrame`` and will raise an error if a non-default one is provided. You
can ``.reset_index()`` to store the index or ``.reset_index(drop=True)`` to
ignore it.
- Duplicate column names and non-string columns names are not supported
- Index level names, if specified, must be strings
- Categorical dtypes can be serialized to parquet, but will de-serialize as ``object`` dtype.
- Non supported types include ``Period`` and actual python object types. These will raise a helpful error message
on an attempt at serialization.
Expand Down
5 changes: 3 additions & 2 deletions doc/source/whatsnew/v0.21.1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ New features
Improvements to the Parquet IO functionality
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- :func:`DataFrame.to_parquet` will now write non-default indexes when the
underlying engine supports it. The indexes will be preserved when reading
back in with :func:`read_parquet` (:issue:`18581`).
- :func:`read_parquet` now allows to specify the columns to read from a parquet file (:issue:`18154`)
- :func:`read_parquet` now allows to specify kwargs which are passed to the respective engine (:issue:`18216`)

Expand Down Expand Up @@ -91,8 +94,6 @@ Performance Improvements

Bug Fixes
~~~~~~~~~
-


Conversion
^^^^^^^^^^
Expand Down
215 changes: 127 additions & 88 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from warnings import catch_warnings
from distutils.version import LooseVersion
from pandas import DataFrame, RangeIndex, Int64Index, get_option
from pandas.compat import range
from pandas.compat import string_types
from pandas.core.common import AbstractMethodError
from pandas.io.common import get_filepath_or_buffer


Expand Down Expand Up @@ -34,82 +35,155 @@ def get_engine(engine):
return FastParquetImpl()


class PyArrowImpl(object):
class BaseImpl(object):

api = None # module

@staticmethod
def validate_dataframe(df):
if not isinstance(df, DataFrame):
raise ValueError("to_parquet only supports IO with DataFrames")
# must have value column names (strings only)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a blank line before comments, easier to read

if df.columns.inferred_type not in {'string', 'unicode'}:
raise ValueError("parquet must have string column names")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also test the index level names to be strings (if not None, and if written)

# index level names must be strings
valid_names = all(
isinstance(name, string_types)
for name in df.index.names
if name is not None
)
if not valid_names:
raise ValueError("Index level names must be strings")

def write(self, df, path, compression, **kwargs):
raise AbstractMethodError(self)

def read(self, path, columns=None, **kwargs):
raise AbstractMethodError(self)


class PyArrowImpl(BaseImpl):

def __init__(self):
# since pandas is a dependency of pyarrow
# we need to import on first use

try:
import pyarrow
import pyarrow.parquet
except ImportError:
raise ImportError("pyarrow is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n")

if LooseVersion(pyarrow.__version__) < LooseVersion('0.4.1'):
raise ImportError("pyarrow >= 0.4.1 is required for parquet"
"support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n")

self._pyarrow_lt_050 = (LooseVersion(pyarrow.__version__) <
LooseVersion('0.5.0'))
self._pyarrow_lt_060 = (LooseVersion(pyarrow.__version__) <
LooseVersion('0.6.0'))
raise ImportError(
"pyarrow is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n"
)
if LooseVersion(pyarrow.__version__) < '0.4.1':
raise ImportError(
"pyarrow >= 0.4.1 is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n"
)
self._pyarrow_lt_070 = (
LooseVersion(pyarrow.__version__) < LooseVersion('0.7.0')
)
self.api = pyarrow

def write(self, df, path, compression='snappy',
coerce_timestamps='ms', **kwargs):
self.validate_dataframe(df)
if self._pyarrow_lt_070:
self._validate_write_lt_070(
df, path, compression, coerce_timestamps, **kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you pass path, compression, coerce_timestamps here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a left over from a refactoring. Should be removed as they're no longer needed...

)
path, _, _ = get_filepath_or_buffer(path)
if self._pyarrow_lt_060:
table = self.api.Table.from_pandas(df, timestamps_to_ms=True)
self.api.parquet.write_table(
table, path, compression=compression, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep this for older pyarrow versions ?


else:
table = self.api.Table.from_pandas(df)
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)
table = self.api.Table.from_pandas(df)
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)

def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
return self.api.parquet.read_table(path, columns=columns,
**kwargs).to_pandas()


class FastParquetImpl(object):
parquet_file = self.api.parquet.ParquetFile(path)
if self._pyarrow_lt_070:
parquet_file.path = path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just pass path to the _read_lt_070 function?

return self._read_lt_070(parquet_file, columns, **kwargs)
kwargs['use_pandas_metadata'] = True
return parquet_file.read(columns=columns, **kwargs).to_pandas()

def _validate_write_lt_070(self, df, path, compression='snappy',
coerce_timestamps='ms', **kwargs):
# Compatibility shim for pyarrow < 0.7.0
# TODO: Remove in pandas 0.22.0
from pandas.core.indexes.multi import MultiIndex
if isinstance(df.index, MultiIndex):
msg = (
"Mulit-index DataFrames are only supported "
"with pyarrow >= 0.7.0"
)
raise ValueError(msg)
# Validate index
if not isinstance(df.index, Int64Index):
msg = (
"parquet does not support serializing {} for the index;"
"you can .reset_index() to make the index into column(s)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should add here that they can also install latest pyarrow or fastparquet version (same for the others)

)
raise ValueError(msg.format(type(df.index)))
if not df.index.equals(RangeIndex(len(df))):
raise ValueError(
"parquet does not support serializing a non-default index "
"for the index; you can .reset_index() to make the index "
"into column(s)"
)
if df.index.name is not None:
raise ValueError(
"parquet does not serialize index meta-data "
"on a default index"
)

def _read_lt_070(self, parquet_file, columns, **kwargs):
# Compatibility shim for pyarrow < 0.7.0
# TODO: Remove in pandas 0.22.0
from itertools import chain
import json
if columns is not None:
metadata = json.loads(parquet_file.metadata.metadata[b'pandas'])
columns = set(chain(columns, metadata['index_columns']))
kwargs['columns'] = columns
kwargs['path'] = parquet_file.path
return self.api.parquet.read_table(**kwargs).to_pandas()


class FastParquetImpl(BaseImpl):

def __init__(self):
# since pandas is a dependency of fastparquet
# we need to import on first use

try:
import fastparquet
except ImportError:
raise ImportError("fastparquet is required for parquet support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet")

if LooseVersion(fastparquet.__version__) < LooseVersion('0.1.0'):
raise ImportError("fastparquet >= 0.1.0 is required for parquet "
"support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet")

raise ImportError(
"fastparquet is required for parquet support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet"
)
if LooseVersion(fastparquet.__version__) < '0.1.0':
raise ImportError(
"fastparquet >= 0.1.0 is required for parquet "
"support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet"
)
self.api = fastparquet

def write(self, df, path, compression='snappy', **kwargs):
self.validate_dataframe(df)
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.
Expand All @@ -120,7 +194,8 @@ def write(self, df, path, compression='snappy', **kwargs):

def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
return self.api.ParquetFile(path).to_pandas(columns=columns, **kwargs)
parquet_file = self.api.ParquetFile(path)
return parquet_file.to_pandas(columns=columns, **kwargs)
Copy link
Contributor Author

@dhirschfeld dhirschfeld Dec 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For fastparquet I haven't bothered reading the metadata to pull out the index columns since even if you do you still hit the NotImplementedError. I figure if fastparquet do add support for multi-indexes they might implement a solution similar to pyarrow so that the user doesn't have to introspect the metadata to reconstruct the correct DataFrame.



def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
Expand All @@ -141,43 +216,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
kwargs
Additional keyword arguments passed to the engine
"""

impl = get_engine(engine)

if not isinstance(df, DataFrame):
raise ValueError("to_parquet only support IO with DataFrames")

valid_types = {'string', 'unicode'}

# validate index
# --------------

# validate that we have only a default index
# raise on anything else as we don't serialize the index

if not isinstance(df.index, Int64Index):
raise ValueError("parquet does not support serializing {} "
"for the index; you can .reset_index()"
"to make the index into column(s)".format(
type(df.index)))

if not df.index.equals(RangeIndex.from_range(range(len(df)))):
raise ValueError("parquet does not support serializing a "
"non-default index for the index; you "
"can .reset_index() to make the index "
"into column(s)")

if df.index.name is not None:
raise ValueError("parquet does not serialize index meta-data on a "
"default index")

# validate columns
# ----------------

# must have value column names (strings only)
if df.columns.inferred_type not in valid_types:
raise ValueError("parquet must have string column names")

return impl.write(df, path, compression=compression, **kwargs)


Expand Down
Loading