diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 5cceb2a9bce8c..690e6b8f725ad 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -230,6 +230,7 @@ Other enhancements - :class:`DatetimeIndex` and :class:`Series` with ``datetime64`` or ``datetime64tz`` dtypes now support ``std`` (:issue:`37436`) - :class:`Window` now supports all Scipy window types in ``win_type`` with flexible keyword argument support (:issue:`34556`) - :meth:`testing.assert_index_equal` now has a ``check_order`` parameter that allows indexes to be checked in an order-insensitive manner (:issue:`37478`) +- :func:`read_csv` supports memory-mapping for compressed files (:issue:`37621`) .. _whatsnew_120.api_breaking.python: diff --git a/pandas/io/common.py b/pandas/io/common.py index 90a79e54015c4..910eb23d9a2d0 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -107,6 +107,7 @@ class IOHandles: handle: Buffer created_handles: List[Buffer] = dataclasses.field(default_factory=list) is_wrapped: bool = False + is_mmap: bool = False def close(self) -> None: """ @@ -604,49 +605,49 @@ def get_handle( except ImportError: pass - handles: List[Buffer] = list() - # Windows does not default to utf-8. Set to utf-8 for a consistent behavior if encoding is None: encoding = "utf-8" # Convert pathlib.Path/py.path.local or string - path_or_buf = stringify_path(path_or_buf) - is_path = isinstance(path_or_buf, str) - f = path_or_buf + handle = stringify_path(path_or_buf) compression, compression_args = get_compression_method(compression) - if is_path: - compression = infer_compression(path_or_buf, compression) + compression = infer_compression(handle, compression) - if compression: + # memory mapping needs to be the first step + handle, memory_map, handles = _maybe_memory_map( + handle, memory_map, encoding, mode, errors + ) + is_path = isinstance(handle, str) + if compression: # GZ Compression if compression == "gzip": if is_path: - assert isinstance(path_or_buf, str) - f = gzip.GzipFile(filename=path_or_buf, mode=mode, **compression_args) + assert isinstance(handle, str) + handle = gzip.GzipFile(filename=handle, mode=mode, **compression_args) else: - f = gzip.GzipFile( - fileobj=path_or_buf, # type: ignore[arg-type] + handle = gzip.GzipFile( + fileobj=handle, # type: ignore[arg-type] mode=mode, **compression_args, ) # BZ Compression elif compression == "bz2": - f = bz2.BZ2File( - path_or_buf, mode=mode, **compression_args # type: ignore[arg-type] + handle = bz2.BZ2File( + handle, mode=mode, **compression_args # type: ignore[arg-type] ) # ZIP Compression elif compression == "zip": - f = _BytesZipFile(path_or_buf, mode, **compression_args) - if f.mode == "r": - handles.append(f) - zip_names = f.namelist() + handle = _BytesZipFile(handle, mode, **compression_args) + if handle.mode == "r": + handles.append(handle) + zip_names = handle.namelist() if len(zip_names) == 1: - f = f.open(zip_names.pop()) + handle = handle.open(zip_names.pop()) elif len(zip_names) == 0: raise ValueError(f"Zero files found in ZIP file {path_or_buf}") else: @@ -657,64 +658,52 @@ def get_handle( # XZ Compression elif compression == "xz": - f = get_lzma_file(lzma)(path_or_buf, mode) + handle = get_lzma_file(lzma)(handle, mode) # Unrecognized Compression else: msg = f"Unrecognized compression type: {compression}" raise ValueError(msg) - assert not isinstance(f, str) - handles.append(f) + assert not isinstance(handle, str) + handles.append(handle) elif is_path: # Check whether the filename is to be opened in binary mode. # Binary mode does not support 'encoding' and 'newline'. - is_binary_mode = "b" in mode - assert isinstance(path_or_buf, str) - if encoding and not is_binary_mode: + assert isinstance(handle, str) + if encoding and "b" not in mode: # Encoding - f = open(path_or_buf, mode, encoding=encoding, errors=errors, newline="") + handle = open(handle, mode, encoding=encoding, errors=errors, newline="") else: # Binary mode - f = open(path_or_buf, mode) - handles.append(f) + handle = open(handle, mode) + handles.append(handle) # Convert BytesIO or file objects passed with an encoding is_wrapped = False if is_text and ( compression - or isinstance(f, need_text_wrapping) - or "b" in getattr(f, "mode", "") + or isinstance(handle, need_text_wrapping) + or "b" in getattr(handle, "mode", "") ): - f = TextIOWrapper( - f, encoding=encoding, errors=errors, newline="" # type: ignore[arg-type] + handle = TextIOWrapper( + handle, # type: ignore[arg-type] + encoding=encoding, + errors=errors, + newline="", ) - handles.append(f) + handles.append(handle) # do not mark as wrapped when the user provided a string is_wrapped = not is_path - if memory_map and hasattr(f, "fileno"): - assert not isinstance(f, str) - try: - wrapped = cast(mmap.mmap, _MMapWrapper(f)) # type: ignore[arg-type] - f.close() - handles.remove(f) - handles.append(wrapped) - f = wrapped - except Exception: - # we catch any errors that may have occurred - # because that is consistent with the lower-level - # functionality of the C engine (pd.read_csv), so - # leave the file handler as is then - pass - handles.reverse() # close the most recently added buffer first - assert not isinstance(f, str) + assert not isinstance(handle, str) return IOHandles( - handle=f, + handle=handle, created_handles=handles, is_wrapped=is_wrapped, + is_mmap=memory_map, ) @@ -778,9 +767,16 @@ class _MMapWrapper(abc.Iterator): """ def __init__(self, f: IO): + self.attributes = {} + for attribute in ("seekable", "readable", "writeable"): + if not hasattr(f, attribute): + continue + self.attributes[attribute] = getattr(f, attribute)() self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) def __getattr__(self, name: str): + if name in self.attributes: + return lambda: self.attributes[name] return getattr(self.mmap, name) def __iter__(self) -> "_MMapWrapper": @@ -799,3 +795,42 @@ def __next__(self) -> str: if newline == "": raise StopIteration return newline + + +def _maybe_memory_map( + handle: FileOrBuffer, + memory_map: bool, + encoding: str, + mode: str, + errors: Optional[str], +) -> Tuple[FileOrBuffer, bool, List[Buffer]]: + """Try to use memory map file/buffer.""" + handles: List[Buffer] = [] + memory_map &= hasattr(handle, "fileno") or isinstance(handle, str) + if not memory_map: + return handle, memory_map, handles + + # need to open the file first + if isinstance(handle, str): + if encoding and "b" not in mode: + # Encoding + handle = open(handle, mode, encoding=encoding, errors=errors, newline="") + else: + # Binary mode + handle = open(handle, mode) + handles.append(handle) + + try: + wrapped = cast(mmap.mmap, _MMapWrapper(handle)) # type: ignore[arg-type] + handle.close() + handles.remove(handle) + handles.append(wrapped) + handle = wrapped + except Exception: + # we catch any errors that may have occurred + # because that is consistent with the lower-level + # functionality of the C engine (pd.read_csv), so + # leave the file handler as is then + memory_map = False + + return handle, memory_map, handles diff --git a/pandas/io/parsers.py b/pandas/io/parsers.py index 3b72869188344..e4895d280c241 100644 --- a/pandas/io/parsers.py +++ b/pandas/io/parsers.py @@ -63,13 +63,7 @@ from pandas.core.series import Series from pandas.core.tools import datetimes as tools -from pandas.io.common import ( - get_compression_method, - get_filepath_or_buffer, - get_handle, - stringify_path, - validate_header_arg, -) +from pandas.io.common import get_filepath_or_buffer, get_handle, validate_header_arg from pandas.io.date_converters import generic_parser # BOM character (byte order mark) @@ -1834,16 +1828,6 @@ def __init__(self, src, **kwds): ParserBase.__init__(self, kwds) - if kwds.get("memory_map", False): - # memory-mapped files are directly handled by the TextReader. - src = stringify_path(src) - - if get_compression_method(kwds.get("compression", None))[0] is not None: - raise ValueError( - "read_csv does not support compression with memory_map=True. " - + "Please use memory_map=False instead." - ) - self.handles = get_handle( src, mode="r", @@ -1855,7 +1839,7 @@ def __init__(self, src, **kwds): kwds.pop("encoding", None) kwds.pop("memory_map", None) kwds.pop("compression", None) - if kwds.get("memory_map", False) and hasattr(self.handles.handle, "mmap"): + if self.handles.is_mmap and hasattr(self.handles.handle, "mmap"): self.handles.handle = self.handles.handle.mmap # #2442 diff --git a/pandas/tests/io/parser/test_common.py b/pandas/tests/io/parser/test_common.py index e61a5fce99c69..8f63d06859f62 100644 --- a/pandas/tests/io/parser/test_common.py +++ b/pandas/tests/io/parser/test_common.py @@ -2275,40 +2275,38 @@ def test_read_csv_file_handle(all_parsers, io_class, encoding): assert not handle.closed -def test_memory_map_compression_error(c_parser_only): +def test_memory_map_file_handle_silent_fallback(all_parsers, compression): """ - c-parsers do not support memory_map=True with compression. + Do not fail for buffers with memory_map=True (cannot memory map BytesIO). - GH 36997 + GH 37621 """ - parser = c_parser_only - df = DataFrame({"a": [1], "b": [2]}) - msg = ( - "read_csv does not support compression with memory_map=True. " - + "Please use memory_map=False instead." - ) + parser = all_parsers + expected = DataFrame({"a": [1], "b": [2]}) - with tm.ensure_clean() as path: - df.to_csv(path, compression="gzip", index=False) + handle = BytesIO() + expected.to_csv(handle, index=False, compression=compression, mode="wb") + handle.seek(0) - with pytest.raises(ValueError, match=msg): - parser.read_csv(path, memory_map=True, compression="gzip") + tm.assert_frame_equal( + parser.read_csv(handle, memory_map=True, compression=compression), + expected, + ) -def test_memory_map_file_handle(all_parsers): +def test_memory_map_compression(all_parsers, compression): """ - Support some buffers with memory_map=True. + Support memory map for compressed files. - GH 36997 + GH 37621 """ parser = all_parsers expected = DataFrame({"a": [1], "b": [2]}) - handle = StringIO() - expected.to_csv(handle, index=False) - handle.seek(0) + with tm.ensure_clean() as path: + expected.to_csv(path, index=False, compression=compression) - tm.assert_frame_equal( - parser.read_csv(handle, memory_map=True), - expected, - ) + tm.assert_frame_equal( + parser.read_csv(path, memory_map=True, compression=compression), + expected, + )