From f4330611ff5ac1cbb4a89c4a7dab3d0900f9e64a Mon Sep 17 00:00:00 2001 From: Jeff Reback Date: Wed, 2 Aug 2017 05:47:57 -0400 Subject: [PATCH] ENH: add to/from_parquet with pyarrow & fastparquet (#15838) --- ci/install_travis.sh | 1 + ci/requirements-2.7.sh | 2 +- ci/requirements-3.5.sh | 4 +- ci/requirements-3.5_OSX.sh | 2 +- ci/requirements-3.6.pip | 1 + ci/requirements-3.6.run | 2 + ci/requirements-3.6_DOC.sh | 2 +- ci/requirements-3.6_WIN.run | 2 + doc/source/install.rst | 1 + doc/source/io.rst | 82 ++++++- doc/source/options.rst | 3 + doc/source/whatsnew/v0.21.0.txt | 1 + pandas/core/config_init.py | 12 + pandas/core/frame.py | 24 ++ pandas/io/api.py | 1 + pandas/io/feather_format.py | 4 +- pandas/io/parquet.py | 194 +++++++++++++++++ pandas/tests/api/test_api.py | 2 +- pandas/tests/io/test_parquet.py | 374 ++++++++++++++++++++++++++++++++ pandas/util/_print_versions.py | 1 + 20 files changed, 703 insertions(+), 12 deletions(-) create mode 100644 pandas/io/parquet.py create mode 100644 pandas/tests/io/test_parquet.py diff --git a/ci/install_travis.sh b/ci/install_travis.sh index dcc1656ce3dd7..df6969c7cc659 100755 --- a/ci/install_travis.sh +++ b/ci/install_travis.sh @@ -153,6 +153,7 @@ fi echo echo "[removing installed pandas]" conda remove pandas -y --force +pip uninstall -y pandas if [ "$BUILD_TEST" ]; then diff --git a/ci/requirements-2.7.sh b/ci/requirements-2.7.sh index 5b20617f55759..e3bd5e46026c5 100644 --- a/ci/requirements-2.7.sh +++ b/ci/requirements-2.7.sh @@ -4,4 +4,4 @@ source activate pandas echo "install 27" -conda install -n pandas -c conda-forge feather-format pyarrow=0.4.1 +conda install -n pandas -c conda-forge feather-format pyarrow=0.4.1 fastparquet diff --git a/ci/requirements-3.5.sh b/ci/requirements-3.5.sh index 3b8fe793a413d..33db9c28c78a9 100644 --- a/ci/requirements-3.5.sh +++ b/ci/requirements-3.5.sh @@ -4,8 +4,8 @@ source activate pandas echo "install 35" -conda install -n pandas -c conda-forge feather-format pyarrow=0.4.1 - # pip install python-dateutil to get latest conda remove -n pandas python-dateutil --force pip install python-dateutil + +conda install -n pandas -c conda-forge feather-format pyarrow=0.4.1 diff --git a/ci/requirements-3.5_OSX.sh b/ci/requirements-3.5_OSX.sh index 39ea1a0cf67bf..c2978b175968c 100644 --- a/ci/requirements-3.5_OSX.sh +++ b/ci/requirements-3.5_OSX.sh @@ -4,4 +4,4 @@ source activate pandas echo "install 35_OSX" -conda install -n pandas -c conda-forge feather-format==0.3.1 +conda install -n pandas -c conda-forge feather-format==0.3.1 fastparquet diff --git a/ci/requirements-3.6.pip b/ci/requirements-3.6.pip index e69de29bb2d1d..753a60d6c119a 100644 --- a/ci/requirements-3.6.pip +++ b/ci/requirements-3.6.pip @@ -0,0 +1 @@ +brotlipy diff --git a/ci/requirements-3.6.run b/ci/requirements-3.6.run index 00db27d3f2704..822144a80bc9a 100644 --- a/ci/requirements-3.6.run +++ b/ci/requirements-3.6.run @@ -17,6 +17,8 @@ pymysql feather-format pyarrow psycopg2 +python-snappy +fastparquet beautifulsoup4 s3fs xarray diff --git a/ci/requirements-3.6_DOC.sh b/ci/requirements-3.6_DOC.sh index 8c10a794a13b9..aec0f62148622 100644 --- a/ci/requirements-3.6_DOC.sh +++ b/ci/requirements-3.6_DOC.sh @@ -6,6 +6,6 @@ echo "[install DOC_BUILD deps]" pip install pandas-gbq -conda install -n pandas -c conda-forge feather-format pyarrow nbsphinx pandoc +conda install -n pandas -c conda-forge feather-format pyarrow nbsphinx pandoc fastparquet conda install -n pandas -c r r rpy2 --yes diff --git a/ci/requirements-3.6_WIN.run b/ci/requirements-3.6_WIN.run index 22aae8944d731..226caa458f6ee 100644 --- a/ci/requirements-3.6_WIN.run +++ b/ci/requirements-3.6_WIN.run @@ -13,3 +13,5 @@ numexpr pytables matplotlib blosc +fastparquet +pyarrow diff --git a/doc/source/install.rst b/doc/source/install.rst index c185a7cf4b875..01a01b1b58b4c 100644 --- a/doc/source/install.rst +++ b/doc/source/install.rst @@ -237,6 +237,7 @@ Optional Dependencies * `xarray `__: pandas like handling for > 2 dims, needed for converting Panels to xarray objects. Version 0.7.0 or higher is recommended. * `PyTables `__: necessary for HDF5-based storage. Version 3.0.0 or higher required, Version 3.2.1 or higher highly recommended. * `Feather Format `__: necessary for feather-based storage, version 0.3.1 or higher. +* ``Apache Parquet Format``, either `pyarrow `__ (>= 0.4.1) or `fastparquet `__ (>= 0.0.6) for parquet-based storage. The `snappy `__ and `brotli `__ are available for compression support. * `SQLAlchemy `__: for SQL database support. Version 0.8.1 or higher recommended. Besides SQLAlchemy, you also need a database specific driver. You can find an overview of supported drivers for each SQL dialect in the `SQLAlchemy docs `__. Some common drivers are: * `psycopg2 `__: for PostgreSQL diff --git a/doc/source/io.rst b/doc/source/io.rst index bf68a0cae1d27..0b97264abfcd7 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -43,6 +43,7 @@ object. The corresponding ``writer`` functions are object methods that are acces binary;`MS Excel `__;:ref:`read_excel`;:ref:`to_excel` binary;`HDF5 Format `__;:ref:`read_hdf`;:ref:`to_hdf` binary;`Feather Format `__;:ref:`read_feather`;:ref:`to_feather` + binary;`Parquet Format `__;:ref:`read_parquet`;:ref:`to_parquet` binary;`Msgpack `__;:ref:`read_msgpack`;:ref:`to_msgpack` binary;`Stata `__;:ref:`read_stata`;:ref:`to_stata` binary;`SAS `__;:ref:`read_sas`; @@ -209,7 +210,7 @@ buffer_lines : int, default None .. deprecated:: 0.19.0 Argument removed because its value is not respected by the parser - + compact_ints : boolean, default False .. deprecated:: 0.19.0 @@ -4087,7 +4088,7 @@ control compression: ``complevel`` and ``complib``. ``complevel`` specifies if and how hard data is to be compressed. ``complevel=0`` and ``complevel=None`` disables compression and ``0`_: Fast compression and decompression. .. versionadded:: 0.20.2 - + Support for alternative blosc compressors: - + - `blosc:blosclz `_ This is the default compressor for ``blosc`` - `blosc:lz4 @@ -4545,6 +4546,79 @@ Read from a feather file. import os os.remove('example.feather') + +.. _io.parquet: + +Parquet +------- + +.. versionadded:: 0.21.0 + +`Parquet `__ + +.. note:: + + These engines are very similar and should read/write nearly identical parquet format files. + These libraries differ by having different underlying dependencies (``fastparquet`` by using ``numba``, while ``pyarrow`` uses a c-library). + +.. ipython:: python + + df = pd.DataFrame({'a': list('abc'), + 'b': list(range(1, 4)), + 'c': np.arange(3, 6).astype('u1'), + 'd': np.arange(4.0, 7.0, dtype='float64'), + 'e': [True, False, True], + 'f': pd.date_range('20130101', periods=3), + 'g': pd.date_range('20130101', periods=3, tz='US/Eastern'), + 'h': pd.date_range('20130101', periods=3, freq='ns')}) + + df + df.dtypes + +Write to a parquet file. + +.. ipython:: python + + df.to_parquet('example_pa.parquet', engine='pyarrow') + df.to_parquet('example_fp.parquet', engine='fastparquet') + +Read from a parquet file. + +.. ipython:: python + + result = pd.read_parquet('example_pa.parquet', engine='pyarrow') + result = pd.read_parquet('example_fp.parquet', engine='fastparquet') + + result.dtypes + +.. ipython:: python + :suppress: + + import os + os.remove('example_pa.parquet') + os.remove('example_fp.parquet') + .. _io.sql: SQL Queries diff --git a/doc/source/options.rst b/doc/source/options.rst index 83b08acac5720..51d02bc89692a 100644 --- a/doc/source/options.rst +++ b/doc/source/options.rst @@ -414,6 +414,9 @@ io.hdf.default_format None default format writing format, 'table' io.hdf.dropna_table True drop ALL nan rows when appending to a table +io.parquet.engine None The engine to use as a default for + parquet reading and writing. If None + then try 'pyarrow' and 'fastparquet' mode.chained_assignment warn Raise an exception, warn, or no action if trying to use chained assignment, The default is warn diff --git a/doc/source/whatsnew/v0.21.0.txt b/doc/source/whatsnew/v0.21.0.txt index 9ec859d5e5b0f..fad6647d4de8d 100644 --- a/doc/source/whatsnew/v0.21.0.txt +++ b/doc/source/whatsnew/v0.21.0.txt @@ -78,6 +78,7 @@ Other Enhancements - :func:`DataFrame.select_dtypes` now accepts scalar values for include/exclude as well as list-like. (:issue:`16855`) - :func:`date_range` now accepts 'YS' in addition to 'AS' as an alias for start of year (:issue:`9313`) - :func:`date_range` now accepts 'Y' in addition to 'A' as an alias for end of year (:issue:`9313`) +- Integration with Apache Parquet, including a new top-level ``pd.read_parquet()`` and ``DataFrame.to_parquet()`` method, see :ref:`here `. .. _whatsnew_0210.api_breaking: diff --git a/pandas/core/config_init.py b/pandas/core/config_init.py index 875ab8249f953..ea5c213dbe057 100644 --- a/pandas/core/config_init.py +++ b/pandas/core/config_init.py @@ -465,3 +465,15 @@ def _register_xlsx(engine, other): except ImportError: # fallback _register_xlsx('openpyxl', 'xlsxwriter') + +# Set up the io.parquet specific configuration. +parquet_engine_doc = """ +: string + The default parquet reader/writer engine. Available options: + 'auto', 'pyarrow', 'fastparquet', the default is 'auto' +""" + +with cf.config_prefix('io.parquet'): + cf.register_option( + 'engine', 'auto', parquet_engine_doc, + validator=is_one_of_factory(['auto', 'pyarrow', 'fastparquet'])) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index e546e96f253c7..9d63bd2e120aa 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -1598,6 +1598,30 @@ def to_feather(self, fname): from pandas.io.feather_format import to_feather to_feather(self, fname) + def to_parquet(self, fname, engine='auto', compression='snappy', + **kwargs): + """ + Write a DataFrame to the binary parquet format. + + .. versionadded:: 0.21.0 + + Parameters + ---------- + fname : str + string file path + engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' + Parquet reader library to use. If 'auto', then the option + 'io.parquet.engine' is used. If 'auto', then the first + library to be installed is used. + compression : str, optional, default 'snappy' + compression method, includes {'gzip', 'snappy', 'brotli'} + kwargs + Additional keyword arguments passed to the engine + """ + from pandas.io.parquet import to_parquet + to_parquet(self, fname, engine, + compression=compression, **kwargs) + @Substitution(header='Write out column names. If a list of string is given, \ it is assumed to be aliases for the column names') @Appender(fmt.docstring_to_string, indents=1) diff --git a/pandas/io/api.py b/pandas/io/api.py index a4a25b78942db..f542a8176dce7 100644 --- a/pandas/io/api.py +++ b/pandas/io/api.py @@ -13,6 +13,7 @@ from pandas.io.sql import read_sql, read_sql_table, read_sql_query from pandas.io.sas import read_sas from pandas.io.feather_format import read_feather +from pandas.io.parquet import read_parquet from pandas.io.stata import read_stata from pandas.io.pickle import read_pickle, to_pickle from pandas.io.packers import read_msgpack, to_msgpack diff --git a/pandas/io/feather_format.py b/pandas/io/feather_format.py index 86d58caa5e816..87a4931421d7d 100644 --- a/pandas/io/feather_format.py +++ b/pandas/io/feather_format.py @@ -19,7 +19,7 @@ def _try_import(): "you can install via conda\n" "conda install feather-format -c conda-forge\n" "or via pip\n" - "pip install feather-format\n") + "pip install -U feather-format\n") try: feather.__version__ >= LooseVersion('0.3.1') @@ -29,7 +29,7 @@ def _try_import(): "you can install via conda\n" "conda install feather-format -c conda-forge" "or via pip\n" - "pip install feather-format\n") + "pip install -U feather-format\n") return feather diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py new file mode 100644 index 0000000000000..0a4426b55b323 --- /dev/null +++ b/pandas/io/parquet.py @@ -0,0 +1,194 @@ +""" parquet compat """ + +from warnings import catch_warnings +from distutils.version import LooseVersion +from pandas import DataFrame, RangeIndex, Int64Index, get_option +from pandas.compat import range +from pandas.io.common import get_filepath_or_buffer + + +def get_engine(engine): + """ return our implementation """ + + if engine is 'auto': + engine = get_option('io.parquet.engine') + + if engine is 'auto': + # try engines in this order + try: + return PyArrowImpl() + except ImportError: + pass + + try: + return FastParquetImpl() + except ImportError: + pass + + if engine not in ['pyarrow', 'fastparquet']: + raise ValueError("engine must be one of 'pyarrow', 'fastparquet'") + + if engine == 'pyarrow': + return PyArrowImpl() + elif engine == 'fastparquet': + return FastParquetImpl() + + +class PyArrowImpl(object): + + def __init__(self): + # since pandas is a dependency of pyarrow + # we need to import on first use + + try: + import pyarrow + import pyarrow.parquet + except ImportError: + raise ImportError("pyarrow is required for parquet support\n\n" + "you can install via conda\n" + "conda install pyarrow -c conda-forge\n" + "\nor via pip\n" + "pip install -U pyarrow\n") + + if LooseVersion(pyarrow.__version__) < '0.4.1': + raise ImportError("pyarrow >= 0.4.1 is required for parquet" + "support\n\n" + "you can install via conda\n" + "conda install pyarrow -c conda-forge\n" + "\nor via pip\n" + "pip install -U pyarrow\n") + + self.api = pyarrow + + def write(self, df, path, compression='snappy', **kwargs): + path, _, _ = get_filepath_or_buffer(path) + table = self.api.Table.from_pandas(df, timestamps_to_ms=True) + self.api.parquet.write_table( + table, path, compression=compression, **kwargs) + + def read(self, path): + path, _, _ = get_filepath_or_buffer(path) + return self.api.parquet.read_table(path).to_pandas() + + +class FastParquetImpl(object): + + def __init__(self): + # since pandas is a dependency of fastparquet + # we need to import on first use + + try: + import fastparquet + except ImportError: + raise ImportError("fastparquet is required for parquet support\n\n" + "you can install via conda\n" + "conda install fastparquet -c conda-forge\n" + "\nor via pip\n" + "pip install -U fastparquet") + + if LooseVersion(fastparquet.__version__) < '0.1.0': + raise ImportError("fastparquet >= 0.1.0 is required for parquet " + "support\n\n" + "you can install via conda\n" + "conda install fastparquet -c conda-forge\n" + "\nor via pip\n" + "pip install -U fastparquet") + + self.api = fastparquet + + def write(self, df, path, compression='snappy', **kwargs): + # thriftpy/protocol/compact.py:339: + # DeprecationWarning: tostring() is deprecated. + # Use tobytes() instead. + path, _, _ = get_filepath_or_buffer(path) + with catch_warnings(record=True): + self.api.write(path, df, + compression=compression, **kwargs) + + def read(self, path): + path, _, _ = get_filepath_or_buffer(path) + return self.api.ParquetFile(path).to_pandas() + + +def to_parquet(df, path, engine='auto', compression='snappy', **kwargs): + """ + Write a DataFrame to the parquet format. + + Parameters + ---------- + df : DataFrame + path : string + File path + engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' + Parquet reader library to use. If 'auto', then the option + 'io.parquet.engine' is used. If 'auto', then the first + library to be installed is used. + compression : str, optional, default 'snappy' + compression method, includes {'gzip', 'snappy', 'brotli'} + kwargs + Additional keyword arguments passed to the engine + """ + + impl = get_engine(engine) + + if not isinstance(df, DataFrame): + raise ValueError("to_parquet only support IO with DataFrames") + + valid_types = {'string', 'unicode'} + + # validate index + # -------------- + + # validate that we have only a default index + # raise on anything else as we don't serialize the index + + if not isinstance(df.index, Int64Index): + raise ValueError("parquet does not support serializing {} " + "for the index; you can .reset_index()" + "to make the index into column(s)".format( + type(df.index))) + + if not df.index.equals(RangeIndex.from_range(range(len(df)))): + raise ValueError("parquet does not support serializing a " + "non-default index for the index; you " + "can .reset_index() to make the index " + "into column(s)") + + if df.index.name is not None: + raise ValueError("parquet does not serialize index meta-data on a " + "default index") + + # validate columns + # ---------------- + + # must have value column names (strings only) + if df.columns.inferred_type not in valid_types: + raise ValueError("parquet must have string column names") + + return impl.write(df, path, compression=compression) + + +def read_parquet(path, engine='auto', **kwargs): + """ + Load a parquet object from the file path, returning a DataFrame. + + .. versionadded 0.21.0 + + Parameters + ---------- + path : string + File path + engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' + Parquet reader library to use. If 'auto', then the option + 'io.parquet.engine' is used. If 'auto', then the first + library to be installed is used. + kwargs are passed to the engine + + Returns + ------- + DataFrame + + """ + + impl = get_engine(engine) + return impl.read(path) diff --git a/pandas/tests/api/test_api.py b/pandas/tests/api/test_api.py index 433ed7e517b1c..09cccd54b74f8 100644 --- a/pandas/tests/api/test_api.py +++ b/pandas/tests/api/test_api.py @@ -82,7 +82,7 @@ class TestPDApi(Base): 'read_gbq', 'read_hdf', 'read_html', 'read_json', 'read_msgpack', 'read_pickle', 'read_sas', 'read_sql', 'read_sql_query', 'read_sql_table', 'read_stata', - 'read_table', 'read_feather'] + 'read_table', 'read_feather', 'read_parquet'] # top-level to_* funcs funcs_to = ['to_datetime', 'to_msgpack', diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py new file mode 100644 index 0000000000000..ff0935c7dcc6f --- /dev/null +++ b/pandas/tests/io/test_parquet.py @@ -0,0 +1,374 @@ +""" test parquet compat """ + +import pytest +import datetime +from warnings import catch_warnings + +import numpy as np +import pandas as pd +from pandas.compat import PY3, is_platform_windows +from pandas.io.parquet import (to_parquet, read_parquet, get_engine, + PyArrowImpl, FastParquetImpl) +from pandas.util import testing as tm + +try: + import pyarrow # noqa + _HAVE_PYARROW = True +except ImportError: + _HAVE_PYARROW = False + +try: + import fastparquet # noqa + _HAVE_FASTPARQUET = True +except ImportError: + _HAVE_FASTPARQUET = False + + +# setup engines & skips +@pytest.fixture(params=[ + pytest.mark.skipif(not _HAVE_FASTPARQUET, + reason='fastparquet is not installed')('fastparquet'), + pytest.mark.skipif(not _HAVE_PYARROW, + reason='pyarrow is not installed')('pyarrow')]) +def engine(request): + return request.param + + +@pytest.fixture +def pa(): + if not _HAVE_PYARROW: + pytest.skip("pyarrow is not installed") + if is_platform_windows(): + pytest.skip("pyarrow-parquet not building on windows") + return 'pyarrow' + + +@pytest.fixture +def fp(): + if not _HAVE_FASTPARQUET: + pytest.skip("fastparquet is not installed") + return 'fastparquet' + + +@pytest.fixture +def df_compat(): + return pd.DataFrame({'A': [1, 2, 3], 'B': 'foo'}) + + +@pytest.fixture +def df_cross_compat(): + df = pd.DataFrame({'a': list('abc'), + 'b': list(range(1, 4)), + 'c': np.arange(3, 6).astype('u1'), + 'd': np.arange(4.0, 7.0, dtype='float64'), + 'e': [True, False, True], + 'f': pd.date_range('20130101', periods=3), + 'g': pd.date_range('20130101', periods=3, + tz='US/Eastern'), + 'h': pd.date_range('20130101', periods=3, freq='ns')}) + return df + + +def test_invalid_engine(df_compat): + + with pytest.raises(ValueError): + df_compat.to_parquet('foo', 'bar') + + +def test_options_py(df_compat, pa): + # use the set option + + df = df_compat + with tm.ensure_clean() as path: + + with pd.option_context('io.parquet.engine', 'pyarrow'): + df.to_parquet(path) + + result = read_parquet(path, compression=None) + tm.assert_frame_equal(result, df) + + +def test_options_fp(df_compat, fp): + # use the set option + + df = df_compat + with tm.ensure_clean() as path: + + with pd.option_context('io.parquet.engine', 'fastparquet'): + df.to_parquet(path, compression=None) + + result = read_parquet(path, compression=None) + tm.assert_frame_equal(result, df) + + +def test_options_auto(df_compat, fp, pa): + + df = df_compat + with tm.ensure_clean() as path: + + with pd.option_context('io.parquet.engine', 'auto'): + df.to_parquet(path) + + result = read_parquet(path, compression=None) + tm.assert_frame_equal(result, df) + + +def test_options_get_engine(fp, pa): + assert isinstance(get_engine('pyarrow'), PyArrowImpl) + assert isinstance(get_engine('fastparquet'), FastParquetImpl) + + with pd.option_context('io.parquet.engine', 'pyarrow'): + assert isinstance(get_engine('auto'), PyArrowImpl) + assert isinstance(get_engine('pyarrow'), PyArrowImpl) + assert isinstance(get_engine('fastparquet'), FastParquetImpl) + + with pd.option_context('io.parquet.engine', 'fastparquet'): + assert isinstance(get_engine('auto'), FastParquetImpl) + assert isinstance(get_engine('pyarrow'), PyArrowImpl) + assert isinstance(get_engine('fastparquet'), FastParquetImpl) + + with pd.option_context('io.parquet.engine', 'auto'): + assert isinstance(get_engine('auto'), PyArrowImpl) + assert isinstance(get_engine('pyarrow'), PyArrowImpl) + assert isinstance(get_engine('fastparquet'), FastParquetImpl) + + +@pytest.mark.xfail(reason="fp does not ignore pa index __index_level_0__") +def test_cross_engine_pa_fp(df_cross_compat, pa, fp): + # cross-compat with differing reading/writing engines + + df = df_cross_compat + with tm.ensure_clean() as path: + df.to_parquet(path, engine=pa, compression=None) + + result = read_parquet(path, engine=fp, compression=None) + tm.assert_frame_equal(result, df) + + +@pytest.mark.xfail(reason="pyarrow reading fp in some cases") +def test_cross_engine_fp_pa(df_cross_compat, pa, fp): + # cross-compat with differing reading/writing engines + + df = df_cross_compat + with tm.ensure_clean() as path: + df.to_parquet(path, engine=fp, compression=None) + + result = read_parquet(path, engine=pa, compression=None) + tm.assert_frame_equal(result, df) + + +class Base(object): + + def check_error_on_write(self, df, engine, exc): + # check that we are raising the exception + # on writing + + with pytest.raises(exc): + with tm.ensure_clean() as path: + to_parquet(df, path, engine, compression=None) + + def check_round_trip(self, df, engine, expected=None, **kwargs): + + with tm.ensure_clean() as path: + df.to_parquet(path, engine, **kwargs) + result = read_parquet(path, engine) + + if expected is None: + expected = df + tm.assert_frame_equal(result, expected) + + # repeat + to_parquet(df, path, engine, **kwargs) + result = pd.read_parquet(path, engine) + + if expected is None: + expected = df + tm.assert_frame_equal(result, expected) + + +class TestBasic(Base): + + def test_error(self, engine): + + for obj in [pd.Series([1, 2, 3]), 1, 'foo', pd.Timestamp('20130101'), + np.array([1, 2, 3])]: + self.check_error_on_write(obj, engine, ValueError) + + def test_columns_dtypes(self, engine): + + df = pd.DataFrame({'string': list('abc'), + 'int': list(range(1, 4))}) + + # unicode + df.columns = [u'foo', u'bar'] + self.check_round_trip(df, engine, compression=None) + + def test_columns_dtypes_invalid(self, engine): + + df = pd.DataFrame({'string': list('abc'), + 'int': list(range(1, 4))}) + + # numeric + df.columns = [0, 1] + self.check_error_on_write(df, engine, ValueError) + + if PY3: + # bytes on PY3, on PY2 these are str + df.columns = [b'foo', b'bar'] + self.check_error_on_write(df, engine, ValueError) + + # python object + df.columns = [datetime.datetime(2011, 1, 1, 0, 0), + datetime.datetime(2011, 1, 1, 1, 1)] + self.check_error_on_write(df, engine, ValueError) + + def test_write_with_index(self, engine): + + df = pd.DataFrame({'A': [1, 2, 3]}) + self.check_round_trip(df, engine, compression=None) + + # non-default index + for index in [[2, 3, 4], + pd.date_range('20130101', periods=3), + list('abc'), + [1, 3, 4], + pd.MultiIndex.from_tuples([('a', 1), ('a', 2), + ('b', 1)]), + ]: + + df.index = index + self.check_error_on_write(df, engine, ValueError) + + # index with meta-data + df.index = [0, 1, 2] + df.index.name = 'foo' + self.check_error_on_write(df, engine, ValueError) + + # column multi-index + df.index = [0, 1, 2] + df.columns = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)]), + self.check_error_on_write(df, engine, ValueError) + + @pytest.mark.parametrize('compression', [None, 'gzip', 'snappy', 'brotli']) + def test_compression(self, engine, compression): + + if compression == 'snappy': + pytest.importorskip('snappy') + + elif compression == 'brotli': + pytest.importorskip('brotli') + + df = pd.DataFrame({'A': [1, 2, 3]}) + self.check_round_trip(df, engine, compression=compression) + + +class TestParquetPyArrow(Base): + + def test_basic(self, pa): + + df = pd.DataFrame({'string': list('abc'), + 'string_with_nan': ['a', np.nan, 'c'], + 'string_with_none': ['a', None, 'c'], + 'bytes': [b'foo', b'bar', b'baz'], + 'unicode': [u'foo', u'bar', u'baz'], + 'int': list(range(1, 4)), + 'uint': np.arange(3, 6).astype('u1'), + 'float': np.arange(4.0, 7.0, dtype='float64'), + 'float_with_nan': [2., np.nan, 3.], + 'bool': [True, False, True], + 'bool_with_none': [True, None, True], + 'datetime_ns': pd.date_range('20130101', periods=3), + 'datetime_with_nat': [pd.Timestamp('20130101'), + pd.NaT, + pd.Timestamp('20130103')] + }) + + self.check_round_trip(df, pa) + + def test_duplicate_columns(self, pa): + + # not currently able to handle duplicate columns + df = pd.DataFrame(np.arange(12).reshape(4, 3), + columns=list('aaa')).copy() + self.check_error_on_write(df, pa, ValueError) + + def test_unsupported(self, pa): + + # period + df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)}) + self.check_error_on_write(df, pa, ValueError) + + # categorical + df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) + self.check_error_on_write(df, pa, NotImplementedError) + + # timedelta + df = pd.DataFrame({'a': pd.timedelta_range('1 day', + periods=3)}) + self.check_error_on_write(df, pa, NotImplementedError) + + # mixed python objects + df = pd.DataFrame({'a': ['a', 1, 2.0]}) + self.check_error_on_write(df, pa, ValueError) + + +class TestParquetFastParquet(Base): + + def test_basic(self, fp): + + df = pd.DataFrame( + {'string': list('abc'), + 'string_with_nan': ['a', np.nan, 'c'], + 'string_with_none': ['a', None, 'c'], + 'bytes': [b'foo', b'bar', b'baz'], + 'unicode': [u'foo', u'bar', u'baz'], + 'int': list(range(1, 4)), + 'uint': np.arange(3, 6).astype('u1'), + 'float': np.arange(4.0, 7.0, dtype='float64'), + 'float_with_nan': [2., np.nan, 3.], + 'bool': [True, False, True], + 'datetime': pd.date_range('20130101', periods=3), + 'datetime_with_nat': [pd.Timestamp('20130101'), + pd.NaT, + pd.Timestamp('20130103')], + 'timedelta': pd.timedelta_range('1 day', periods=3), + }) + + self.check_round_trip(df, fp, compression=None) + + @pytest.mark.skip(reason="not supported") + def test_duplicate_columns(self, fp): + + # not currently able to handle duplicate columns + df = pd.DataFrame(np.arange(12).reshape(4, 3), + columns=list('aaa')).copy() + self.check_error_on_write(df, fp, ValueError) + + def test_bool_with_none(self, fp): + df = pd.DataFrame({'a': [True, None, False]}) + expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16') + self.check_round_trip(df, fp, expected=expected, compression=None) + + def test_unsupported(self, fp): + + # period + df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)}) + self.check_error_on_write(df, fp, ValueError) + + # mixed + df = pd.DataFrame({'a': ['a', 1, 2.0]}) + self.check_error_on_write(df, fp, ValueError) + + def test_categorical(self, fp): + df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) + self.check_round_trip(df, fp, compression=None) + + def test_datetime_tz(self, fp): + # doesn't preserve tz + df = pd.DataFrame({'a': pd.date_range('20130101', periods=3, + tz='US/Eastern')}) + + # warns on the coercion + with catch_warnings(record=True): + self.check_round_trip(df, fp, df.astype('datetime64[ns]'), + compression=None) diff --git a/pandas/util/_print_versions.py b/pandas/util/_print_versions.py index 48b19b02e297e..9ecd4b10365c8 100644 --- a/pandas/util/_print_versions.py +++ b/pandas/util/_print_versions.py @@ -94,6 +94,7 @@ def show_versions(as_json=False): ("psycopg2", lambda mod: mod.__version__), ("jinja2", lambda mod: mod.__version__), ("s3fs", lambda mod: mod.__version__), + ("fastparquet", lambda mod: mod.__version__), ("pandas_gbq", lambda mod: mod.__version__), ("pandas_datareader", lambda mod: mod.__version__), ]