Skip to content

Commit

Permalink
ENH: support non default indexes in writing to Parquet (#18629)
Browse files Browse the repository at this point in the history
fastparquet automatically names an index 'index' if it doesn't
already have a name
  • Loading branch information
dhirschfeld authored and jorisvandenbossche committed Dec 11, 2017
1 parent 2aa4aa9 commit 8d7e876
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 116 deletions.
5 changes: 1 addition & 4 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4504,11 +4504,8 @@ dtypes, including extension dtypes such as datetime with tz.

Several caveats.

- The format will NOT write an ``Index``, or ``MultiIndex`` for the
``DataFrame`` and will raise an error if a non-default one is provided. You
can ``.reset_index()`` to store the index or ``.reset_index(drop=True)`` to
ignore it.
- Duplicate column names and non-string columns names are not supported
- Index level names, if specified, must be strings
- Categorical dtypes can be serialized to parquet, but will de-serialize as ``object`` dtype.
- Non supported types include ``Period`` and actual python object types. These will raise a helpful error message
on an attempt at serialization.
Expand Down
5 changes: 3 additions & 2 deletions doc/source/whatsnew/v0.21.1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ New features
Improvements to the Parquet IO functionality
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- :func:`DataFrame.to_parquet` will now write non-default indexes when the
underlying engine supports it. The indexes will be preserved when reading
back in with :func:`read_parquet` (:issue:`18581`).
- :func:`read_parquet` now allows to specify the columns to read from a parquet file (:issue:`18154`)
- :func:`read_parquet` now allows to specify kwargs which are passed to the respective engine (:issue:`18216`)

Expand Down Expand Up @@ -91,8 +94,6 @@ Performance Improvements

Bug Fixes
~~~~~~~~~
-


Conversion
^^^^^^^^^^
Expand Down
191 changes: 116 additions & 75 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from warnings import catch_warnings
from distutils.version import LooseVersion
from pandas import DataFrame, RangeIndex, Int64Index, get_option
from pandas.compat import range
from pandas.compat import string_types
from pandas.core.common import AbstractMethodError
from pandas.io.common import get_filepath_or_buffer


Expand Down Expand Up @@ -39,39 +40,75 @@ def get_engine(engine):
return FastParquetImpl()


class PyArrowImpl(object):
class BaseImpl(object):

api = None # module

@staticmethod
def validate_dataframe(df):

if not isinstance(df, DataFrame):
raise ValueError("to_parquet only supports IO with DataFrames")

# must have value column names (strings only)
if df.columns.inferred_type not in {'string', 'unicode'}:
raise ValueError("parquet must have string column names")

# index level names must be strings
valid_names = all(
isinstance(name, string_types)
for name in df.index.names
if name is not None
)
if not valid_names:
raise ValueError("Index level names must be strings")

def write(self, df, path, compression, **kwargs):
raise AbstractMethodError(self)

def read(self, path, columns=None, **kwargs):
raise AbstractMethodError(self)


class PyArrowImpl(BaseImpl):

def __init__(self):
# since pandas is a dependency of pyarrow
# we need to import on first use

try:
import pyarrow
import pyarrow.parquet
except ImportError:
raise ImportError("pyarrow is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n")

if LooseVersion(pyarrow.__version__) < LooseVersion('0.4.1'):
raise ImportError("pyarrow >= 0.4.1 is required for parquet"
"support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n")

self._pyarrow_lt_050 = (LooseVersion(pyarrow.__version__) <
LooseVersion('0.5.0'))
self._pyarrow_lt_060 = (LooseVersion(pyarrow.__version__) <
LooseVersion('0.6.0'))
raise ImportError(
"pyarrow is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n"
)
if LooseVersion(pyarrow.__version__) < '0.4.1':
raise ImportError(
"pyarrow >= 0.4.1 is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n"
)

self._pyarrow_lt_060 = (
LooseVersion(pyarrow.__version__) < LooseVersion('0.6.0'))
self._pyarrow_lt_070 = (
LooseVersion(pyarrow.__version__) < LooseVersion('0.7.0'))

self.api = pyarrow

def write(self, df, path, compression='snappy',
coerce_timestamps='ms', **kwargs):
self.validate_dataframe(df)
if self._pyarrow_lt_070:
self._validate_write_lt_070(df)
path, _, _ = get_filepath_or_buffer(path)

if self._pyarrow_lt_060:
table = self.api.Table.from_pandas(df, timestamps_to_ms=True)
self.api.parquet.write_table(
Expand All @@ -85,36 +122,75 @@ def write(self, df, path, compression='snappy',

def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
if self._pyarrow_lt_070:
return self.api.parquet.read_pandas(path, columns=columns,
**kwargs).to_pandas()
kwargs['use_pandas_metadata'] = True
return self.api.parquet.read_table(path, columns=columns,
**kwargs).to_pandas()


class FastParquetImpl(object):
def _validate_write_lt_070(self, df):
# Compatibility shim for pyarrow < 0.7.0
# TODO: Remove in pandas 0.22.0
from pandas.core.indexes.multi import MultiIndex
if isinstance(df.index, MultiIndex):
msg = (
"Multi-index DataFrames are only supported "
"with pyarrow >= 0.7.0"
)
raise ValueError(msg)
# Validate index
if not isinstance(df.index, Int64Index):
msg = (
"pyarrow < 0.7.0 does not support serializing {} for the "
"index; you can .reset_index() to make the index into "
"column(s), or install the latest version of pyarrow or "
"fastparquet."
)
raise ValueError(msg.format(type(df.index)))
if not df.index.equals(RangeIndex(len(df))):
raise ValueError(
"pyarrow < 0.7.0 does not support serializing a non-default "
"index; you can .reset_index() to make the index into "
"column(s), or install the latest version of pyarrow or "
"fastparquet."
)
if df.index.name is not None:
raise ValueError(
"pyarrow < 0.7.0 does not serialize indexes with a name; you "
"can set the index.name to None or install the latest version "
"of pyarrow or fastparquet."
)


class FastParquetImpl(BaseImpl):

def __init__(self):
# since pandas is a dependency of fastparquet
# we need to import on first use

try:
import fastparquet
except ImportError:
raise ImportError("fastparquet is required for parquet support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet")

if LooseVersion(fastparquet.__version__) < LooseVersion('0.1.0'):
raise ImportError("fastparquet >= 0.1.0 is required for parquet "
"support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet")

raise ImportError(
"fastparquet is required for parquet support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet"
)
if LooseVersion(fastparquet.__version__) < '0.1.0':
raise ImportError(
"fastparquet >= 0.1.0 is required for parquet "
"support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet"
)
self.api = fastparquet

def write(self, df, path, compression='snappy', **kwargs):
self.validate_dataframe(df)
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.
Expand All @@ -125,7 +201,8 @@ def write(self, df, path, compression='snappy', **kwargs):

def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
return self.api.ParquetFile(path).to_pandas(columns=columns, **kwargs)
parquet_file = self.api.ParquetFile(path)
return parquet_file.to_pandas(columns=columns, **kwargs)


def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
Expand All @@ -146,43 +223,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
kwargs
Additional keyword arguments passed to the engine
"""

impl = get_engine(engine)

if not isinstance(df, DataFrame):
raise ValueError("to_parquet only support IO with DataFrames")

valid_types = {'string', 'unicode'}

# validate index
# --------------

# validate that we have only a default index
# raise on anything else as we don't serialize the index

if not isinstance(df.index, Int64Index):
raise ValueError("parquet does not support serializing {} "
"for the index; you can .reset_index()"
"to make the index into column(s)".format(
type(df.index)))

if not df.index.equals(RangeIndex.from_range(range(len(df)))):
raise ValueError("parquet does not support serializing a "
"non-default index for the index; you "
"can .reset_index() to make the index "
"into column(s)")

if df.index.name is not None:
raise ValueError("parquet does not serialize index meta-data on a "
"default index")

# validate columns
# ----------------

# must have value column names (strings only)
if df.columns.inferred_type not in valid_types:
raise ValueError("parquet must have string column names")

return impl.write(df, path, compression=compression, **kwargs)


Expand Down
Loading

0 comments on commit 8d7e876

Please sign in to comment.