diff --git a/doc/source/io.rst b/doc/source/io.rst index 54e7a11c5f2b1..2cbd64bf5186b 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4504,11 +4504,8 @@ dtypes, including extension dtypes such as datetime with tz. Several caveats. -- The format will NOT write an ``Index``, or ``MultiIndex`` for the - ``DataFrame`` and will raise an error if a non-default one is provided. You - can ``.reset_index()`` to store the index or ``.reset_index(drop=True)`` to - ignore it. - Duplicate column names and non-string columns names are not supported +- Index level names, if specified, must be strings - Categorical dtypes can be serialized to parquet, but will de-serialize as ``object`` dtype. - Non supported types include ``Period`` and actual python object types. These will raise a helpful error message on an attempt at serialization. diff --git a/doc/source/whatsnew/v0.21.1.txt b/doc/source/whatsnew/v0.21.1.txt index 31902c98d0b6c..1b709cfd4aa90 100644 --- a/doc/source/whatsnew/v0.21.1.txt +++ b/doc/source/whatsnew/v0.21.1.txt @@ -61,6 +61,9 @@ New features Improvements to the Parquet IO functionality ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +- :func:`DataFrame.to_parquet` will now write non-default indexes when the + underlying engine supports it. The indexes will be preserved when reading + back in with :func:`read_parquet` (:issue:`18581`). - :func:`read_parquet` now allows to specify the columns to read from a parquet file (:issue:`18154`) - :func:`read_parquet` now allows to specify kwargs which are passed to the respective engine (:issue:`18216`) @@ -91,8 +94,6 @@ Performance Improvements Bug Fixes ~~~~~~~~~ -- - Conversion ^^^^^^^^^^ diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 7827c3ae04d4d..aa5dd821f5980 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -3,7 +3,8 @@ from warnings import catch_warnings from distutils.version import LooseVersion from pandas import DataFrame, RangeIndex, Int64Index, get_option -from pandas.compat import range +from pandas.compat import string_types +from pandas.core.common import AbstractMethodError from pandas.io.common import get_filepath_or_buffer @@ -34,39 +35,75 @@ def get_engine(engine): return FastParquetImpl() -class PyArrowImpl(object): +class BaseImpl(object): + + api = None # module + + @staticmethod + def validate_dataframe(df): + + if not isinstance(df, DataFrame): + raise ValueError("to_parquet only supports IO with DataFrames") + + # must have value column names (strings only) + if df.columns.inferred_type not in {'string', 'unicode'}: + raise ValueError("parquet must have string column names") + + # index level names must be strings + valid_names = all( + isinstance(name, string_types) + for name in df.index.names + if name is not None + ) + if not valid_names: + raise ValueError("Index level names must be strings") + + def write(self, df, path, compression, **kwargs): + raise AbstractMethodError(self) + + def read(self, path, columns=None, **kwargs): + raise AbstractMethodError(self) + + +class PyArrowImpl(BaseImpl): def __init__(self): # since pandas is a dependency of pyarrow # we need to import on first use - try: import pyarrow import pyarrow.parquet except ImportError: - raise ImportError("pyarrow is required for parquet support\n\n" - "you can install via conda\n" - "conda install pyarrow -c conda-forge\n" - "\nor via pip\n" - "pip install -U pyarrow\n") - - if LooseVersion(pyarrow.__version__) < LooseVersion('0.4.1'): - raise ImportError("pyarrow >= 0.4.1 is required for parquet" - "support\n\n" - "you can install via conda\n" - "conda install pyarrow -c conda-forge\n" - "\nor via pip\n" - "pip install -U pyarrow\n") - - self._pyarrow_lt_050 = (LooseVersion(pyarrow.__version__) < - LooseVersion('0.5.0')) - self._pyarrow_lt_060 = (LooseVersion(pyarrow.__version__) < - LooseVersion('0.6.0')) + raise ImportError( + "pyarrow is required for parquet support\n\n" + "you can install via conda\n" + "conda install pyarrow -c conda-forge\n" + "\nor via pip\n" + "pip install -U pyarrow\n" + ) + if LooseVersion(pyarrow.__version__) < '0.4.1': + raise ImportError( + "pyarrow >= 0.4.1 is required for parquet support\n\n" + "you can install via conda\n" + "conda install pyarrow -c conda-forge\n" + "\nor via pip\n" + "pip install -U pyarrow\n" + ) + + self._pyarrow_lt_060 = ( + LooseVersion(pyarrow.__version__) < LooseVersion('0.6.0')) + self._pyarrow_lt_070 = ( + LooseVersion(pyarrow.__version__) < LooseVersion('0.7.0')) + self.api = pyarrow def write(self, df, path, compression='snappy', coerce_timestamps='ms', **kwargs): + self.validate_dataframe(df) + if self._pyarrow_lt_070: + self._validate_write_lt_070(df) path, _, _ = get_filepath_or_buffer(path) + if self._pyarrow_lt_060: table = self.api.Table.from_pandas(df, timestamps_to_ms=True) self.api.parquet.write_table( @@ -80,36 +117,75 @@ def write(self, df, path, compression='snappy', def read(self, path, columns=None, **kwargs): path, _, _ = get_filepath_or_buffer(path) + if self._pyarrow_lt_070: + return self.api.parquet.read_pandas(path, columns=columns, + **kwargs).to_pandas() + kwargs['use_pandas_metadata'] = True return self.api.parquet.read_table(path, columns=columns, **kwargs).to_pandas() - -class FastParquetImpl(object): + def _validate_write_lt_070(self, df): + # Compatibility shim for pyarrow < 0.7.0 + # TODO: Remove in pandas 0.22.0 + from pandas.core.indexes.multi import MultiIndex + if isinstance(df.index, MultiIndex): + msg = ( + "Multi-index DataFrames are only supported " + "with pyarrow >= 0.7.0" + ) + raise ValueError(msg) + # Validate index + if not isinstance(df.index, Int64Index): + msg = ( + "pyarrow < 0.7.0 does not support serializing {} for the " + "index; you can .reset_index() to make the index into " + "column(s), or install the latest version of pyarrow or " + "fastparquet." + ) + raise ValueError(msg.format(type(df.index))) + if not df.index.equals(RangeIndex(len(df))): + raise ValueError( + "pyarrow < 0.7.0 does not support serializing a non-default " + "index; you can .reset_index() to make the index into " + "column(s), or install the latest version of pyarrow or " + "fastparquet." + ) + if df.index.name is not None: + raise ValueError( + "pyarrow < 0.7.0 does not serialize indexes with a name; you " + "can set the index.name to None or install the latest version " + "of pyarrow or fastparquet." + ) + + +class FastParquetImpl(BaseImpl): def __init__(self): # since pandas is a dependency of fastparquet # we need to import on first use - try: import fastparquet except ImportError: - raise ImportError("fastparquet is required for parquet support\n\n" - "you can install via conda\n" - "conda install fastparquet -c conda-forge\n" - "\nor via pip\n" - "pip install -U fastparquet") - - if LooseVersion(fastparquet.__version__) < LooseVersion('0.1.0'): - raise ImportError("fastparquet >= 0.1.0 is required for parquet " - "support\n\n" - "you can install via conda\n" - "conda install fastparquet -c conda-forge\n" - "\nor via pip\n" - "pip install -U fastparquet") - + raise ImportError( + "fastparquet is required for parquet support\n\n" + "you can install via conda\n" + "conda install fastparquet -c conda-forge\n" + "\nor via pip\n" + "pip install -U fastparquet" + ) + if LooseVersion(fastparquet.__version__) < '0.1.0': + raise ImportError( + "fastparquet >= 0.1.0 is required for parquet " + "support\n\n" + "you can install via conda\n" + "conda install fastparquet -c conda-forge\n" + "\nor via pip\n" + "pip install -U fastparquet" + ) self.api = fastparquet def write(self, df, path, compression='snappy', **kwargs): + self.validate_dataframe(df) # thriftpy/protocol/compact.py:339: # DeprecationWarning: tostring() is deprecated. # Use tobytes() instead. @@ -120,7 +196,8 @@ def write(self, df, path, compression='snappy', **kwargs): def read(self, path, columns=None, **kwargs): path, _, _ = get_filepath_or_buffer(path) - return self.api.ParquetFile(path).to_pandas(columns=columns, **kwargs) + parquet_file = self.api.ParquetFile(path) + return parquet_file.to_pandas(columns=columns, **kwargs) def to_parquet(df, path, engine='auto', compression='snappy', **kwargs): @@ -141,43 +218,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs): kwargs Additional keyword arguments passed to the engine """ - impl = get_engine(engine) - - if not isinstance(df, DataFrame): - raise ValueError("to_parquet only support IO with DataFrames") - - valid_types = {'string', 'unicode'} - - # validate index - # -------------- - - # validate that we have only a default index - # raise on anything else as we don't serialize the index - - if not isinstance(df.index, Int64Index): - raise ValueError("parquet does not support serializing {} " - "for the index; you can .reset_index()" - "to make the index into column(s)".format( - type(df.index))) - - if not df.index.equals(RangeIndex.from_range(range(len(df)))): - raise ValueError("parquet does not support serializing a " - "non-default index for the index; you " - "can .reset_index() to make the index " - "into column(s)") - - if df.index.name is not None: - raise ValueError("parquet does not serialize index meta-data on a " - "default index") - - # validate columns - # ---------------- - - # must have value column names (strings only) - if df.columns.inferred_type not in valid_types: - raise ValueError("parquet must have string column names") - return impl.write(df, path, compression=compression, **kwargs) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 8e6f8998f5eeb..c59acbd946f91 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -207,15 +207,14 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp): class Base(object): def check_error_on_write(self, df, engine, exc): - # check that we are raising the exception - # on writing - + # check that we are raising the exception on writing with pytest.raises(exc): with tm.ensure_clean() as path: to_parquet(df, path, engine, compression=None) def check_round_trip(self, df, engine, expected=None, - write_kwargs=None, read_kwargs=None): + write_kwargs=None, read_kwargs=None, + check_names=True): if write_kwargs is None: write_kwargs = {} if read_kwargs is None: @@ -226,7 +225,7 @@ def check_round_trip(self, df, engine, expected=None, if expected is None: expected = df - tm.assert_frame_equal(result, expected) + tm.assert_frame_equal(result, expected, check_names=check_names) # repeat to_parquet(df, path, engine, **write_kwargs) @@ -234,7 +233,7 @@ def check_round_trip(self, df, engine, expected=None, if expected is None: expected = df - tm.assert_frame_equal(result, expected) + tm.assert_frame_equal(result, expected, check_names=check_names) class TestBasic(Base): @@ -273,33 +272,6 @@ def test_columns_dtypes_invalid(self, engine): datetime.datetime(2011, 1, 1, 1, 1)] self.check_error_on_write(df, engine, ValueError) - def test_write_with_index(self, engine): - - df = pd.DataFrame({'A': [1, 2, 3]}) - self.check_round_trip(df, engine, write_kwargs={'compression': None}) - - # non-default index - for index in [[2, 3, 4], - pd.date_range('20130101', periods=3), - list('abc'), - [1, 3, 4], - pd.MultiIndex.from_tuples([('a', 1), ('a', 2), - ('b', 1)]), - ]: - - df.index = index - self.check_error_on_write(df, engine, ValueError) - - # index with meta-data - df.index = [0, 1, 2] - df.index.name = 'foo' - self.check_error_on_write(df, engine, ValueError) - - # column multi-index - df.index = [0, 1, 2] - df.columns = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)]), - self.check_error_on_write(df, engine, ValueError) - @pytest.mark.parametrize('compression', [None, 'gzip', 'snappy', 'brotli']) def test_compression(self, engine, compression): @@ -323,6 +295,72 @@ def test_read_columns(self, engine): write_kwargs={'compression': None}, read_kwargs={'columns': ['string']}) + def test_write_index(self, engine): + check_names = engine != 'fastparquet' + + if engine == 'pyarrow': + import pyarrow + if LooseVersion(pyarrow.__version__) < LooseVersion('0.7.0'): + pytest.skip("pyarrow is < 0.7.0") + + df = pd.DataFrame({'A': [1, 2, 3]}) + self.check_round_trip(df, engine, write_kwargs={'compression': None}) + + indexes = [ + [2, 3, 4], + pd.date_range('20130101', periods=3), + list('abc'), + [1, 3, 4], + ] + # non-default index + for index in indexes: + df.index = index + self.check_round_trip( + df, engine, + write_kwargs={'compression': None}, + check_names=check_names) + + # index with meta-data + df.index = [0, 1, 2] + df.index.name = 'foo' + self.check_round_trip(df, engine, write_kwargs={'compression': None}) + + def test_write_multiindex(self, pa_ge_070): + # Not suppoprted in fastparquet as of 0.1.3 or older pyarrow version + engine = pa_ge_070 + + df = pd.DataFrame({'A': [1, 2, 3]}) + index = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)]) + df.index = index + self.check_round_trip(df, engine, write_kwargs={'compression': None}) + + def test_write_column_multiindex(self, engine): + # column multi-index + mi_columns = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)]) + df = pd.DataFrame(np.random.randn(4, 3), columns=mi_columns) + self.check_error_on_write(df, engine, ValueError) + + def test_multiindex_with_columns(self, pa_ge_070): + + engine = pa_ge_070 + dates = pd.date_range('01-Jan-2018', '01-Dec-2018', freq='MS') + df = pd.DataFrame(np.random.randn(2 * len(dates), 3), + columns=list('ABC')) + index1 = pd.MultiIndex.from_product( + [['Level1', 'Level2'], dates], + names=['level', 'date']) + index2 = index1.copy(names=None) + for index in [index1, index2]: + df.index = index + with tm.ensure_clean() as path: + df.to_parquet(path, engine) + result = read_parquet(path, engine) + expected = df + tm.assert_frame_equal(result, expected) + result = read_parquet(path, engine, columns=['A', 'B']) + expected = df[['A', 'B']] + tm.assert_frame_equal(result, expected) + class TestParquetPyArrow(Base): @@ -352,14 +390,12 @@ def test_basic_subset_columns(self, pa, df_full): read_kwargs={'columns': ['string', 'int']}) def test_duplicate_columns(self, pa): - # not currently able to handle duplicate columns df = pd.DataFrame(np.arange(12).reshape(4, 3), columns=list('aaa')).copy() self.check_error_on_write(df, pa, ValueError) def test_unsupported(self, pa): - # period df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)}) self.check_error_on_write(df, pa, ValueError)