Skip to content

Commit

Permalink
ENH: memory_map for compressed files (#37621)
Browse files Browse the repository at this point in the history
  • Loading branch information
twoertwein committed Nov 5, 2020
1 parent e0699ca commit 22de62c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 91 deletions.
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ Other enhancements
- :class:`DatetimeIndex` and :class:`Series` with ``datetime64`` or ``datetime64tz`` dtypes now support ``std`` (:issue:`37436`)
- :class:`Window` now supports all Scipy window types in ``win_type`` with flexible keyword argument support (:issue:`34556`)
- :meth:`testing.assert_index_equal` now has a ``check_order`` parameter that allows indexes to be checked in an order-insensitive manner (:issue:`37478`)
- :func:`read_csv` supports memory-mapping for compressed files (:issue:`37621`)

.. _whatsnew_120.api_breaking.python:

Expand Down
135 changes: 85 additions & 50 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class IOHandles:
handle: Buffer
created_handles: List[Buffer] = dataclasses.field(default_factory=list)
is_wrapped: bool = False
is_mmap: bool = False

def close(self) -> None:
"""
Expand Down Expand Up @@ -604,49 +605,49 @@ def get_handle(
except ImportError:
pass

handles: List[Buffer] = list()

# Windows does not default to utf-8. Set to utf-8 for a consistent behavior
if encoding is None:
encoding = "utf-8"

# Convert pathlib.Path/py.path.local or string
path_or_buf = stringify_path(path_or_buf)
is_path = isinstance(path_or_buf, str)
f = path_or_buf
handle = stringify_path(path_or_buf)

compression, compression_args = get_compression_method(compression)
if is_path:
compression = infer_compression(path_or_buf, compression)
compression = infer_compression(handle, compression)

if compression:
# memory mapping needs to be the first step
handle, memory_map, handles = _maybe_memory_map(
handle, memory_map, encoding, mode, errors
)

is_path = isinstance(handle, str)
if compression:
# GZ Compression
if compression == "gzip":
if is_path:
assert isinstance(path_or_buf, str)
f = gzip.GzipFile(filename=path_or_buf, mode=mode, **compression_args)
assert isinstance(handle, str)
handle = gzip.GzipFile(filename=handle, mode=mode, **compression_args)
else:
f = gzip.GzipFile(
fileobj=path_or_buf, # type: ignore[arg-type]
handle = gzip.GzipFile(
fileobj=handle, # type: ignore[arg-type]
mode=mode,
**compression_args,
)

# BZ Compression
elif compression == "bz2":
f = bz2.BZ2File(
path_or_buf, mode=mode, **compression_args # type: ignore[arg-type]
handle = bz2.BZ2File(
handle, mode=mode, **compression_args # type: ignore[arg-type]
)

# ZIP Compression
elif compression == "zip":
f = _BytesZipFile(path_or_buf, mode, **compression_args)
if f.mode == "r":
handles.append(f)
zip_names = f.namelist()
handle = _BytesZipFile(handle, mode, **compression_args)
if handle.mode == "r":
handles.append(handle)
zip_names = handle.namelist()
if len(zip_names) == 1:
f = f.open(zip_names.pop())
handle = handle.open(zip_names.pop())
elif len(zip_names) == 0:
raise ValueError(f"Zero files found in ZIP file {path_or_buf}")
else:
Expand All @@ -657,64 +658,52 @@ def get_handle(

# XZ Compression
elif compression == "xz":
f = get_lzma_file(lzma)(path_or_buf, mode)
handle = get_lzma_file(lzma)(handle, mode)

# Unrecognized Compression
else:
msg = f"Unrecognized compression type: {compression}"
raise ValueError(msg)

assert not isinstance(f, str)
handles.append(f)
assert not isinstance(handle, str)
handles.append(handle)

elif is_path:
# Check whether the filename is to be opened in binary mode.
# Binary mode does not support 'encoding' and 'newline'.
is_binary_mode = "b" in mode
assert isinstance(path_or_buf, str)
if encoding and not is_binary_mode:
assert isinstance(handle, str)
if encoding and "b" not in mode:
# Encoding
f = open(path_or_buf, mode, encoding=encoding, errors=errors, newline="")
handle = open(handle, mode, encoding=encoding, errors=errors, newline="")
else:
# Binary mode
f = open(path_or_buf, mode)
handles.append(f)
handle = open(handle, mode)
handles.append(handle)

# Convert BytesIO or file objects passed with an encoding
is_wrapped = False
if is_text and (
compression
or isinstance(f, need_text_wrapping)
or "b" in getattr(f, "mode", "")
or isinstance(handle, need_text_wrapping)
or "b" in getattr(handle, "mode", "")
):
f = TextIOWrapper(
f, encoding=encoding, errors=errors, newline="" # type: ignore[arg-type]
handle = TextIOWrapper(
handle, # type: ignore[arg-type]
encoding=encoding,
errors=errors,
newline="",
)
handles.append(f)
handles.append(handle)
# do not mark as wrapped when the user provided a string
is_wrapped = not is_path

if memory_map and hasattr(f, "fileno"):
assert not isinstance(f, str)
try:
wrapped = cast(mmap.mmap, _MMapWrapper(f)) # type: ignore[arg-type]
f.close()
handles.remove(f)
handles.append(wrapped)
f = wrapped
except Exception:
# we catch any errors that may have occurred
# because that is consistent with the lower-level
# functionality of the C engine (pd.read_csv), so
# leave the file handler as is then
pass

handles.reverse() # close the most recently added buffer first
assert not isinstance(f, str)
assert not isinstance(handle, str)
return IOHandles(
handle=f,
handle=handle,
created_handles=handles,
is_wrapped=is_wrapped,
is_mmap=memory_map,
)


Expand Down Expand Up @@ -778,9 +767,16 @@ class _MMapWrapper(abc.Iterator):
"""

def __init__(self, f: IO):
self.attributes = {}
for attribute in ("seekable", "readable", "writeable"):
if not hasattr(f, attribute):
continue
self.attributes[attribute] = getattr(f, attribute)()
self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

def __getattr__(self, name: str):
if name in self.attributes:
return lambda: self.attributes[name]
return getattr(self.mmap, name)

def __iter__(self) -> "_MMapWrapper":
Expand All @@ -799,3 +795,42 @@ def __next__(self) -> str:
if newline == "":
raise StopIteration
return newline


def _maybe_memory_map(
handle: FileOrBuffer,
memory_map: bool,
encoding: str,
mode: str,
errors: Optional[str],
) -> Tuple[FileOrBuffer, bool, List[Buffer]]:
"""Try to use memory map file/buffer."""
handles: List[Buffer] = []
memory_map &= hasattr(handle, "fileno") or isinstance(handle, str)
if not memory_map:
return handle, memory_map, handles

# need to open the file first
if isinstance(handle, str):
if encoding and "b" not in mode:
# Encoding
handle = open(handle, mode, encoding=encoding, errors=errors, newline="")
else:
# Binary mode
handle = open(handle, mode)
handles.append(handle)

try:
wrapped = cast(mmap.mmap, _MMapWrapper(handle)) # type: ignore[arg-type]
handle.close()
handles.remove(handle)
handles.append(wrapped)
handle = wrapped
except Exception:
# we catch any errors that may have occurred
# because that is consistent with the lower-level
# functionality of the C engine (pd.read_csv), so
# leave the file handler as is then
memory_map = False

return handle, memory_map, handles
20 changes: 2 additions & 18 deletions pandas/io/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,7 @@
from pandas.core.series import Series
from pandas.core.tools import datetimes as tools

from pandas.io.common import (
get_compression_method,
get_filepath_or_buffer,
get_handle,
stringify_path,
validate_header_arg,
)
from pandas.io.common import get_filepath_or_buffer, get_handle, validate_header_arg
from pandas.io.date_converters import generic_parser

# BOM character (byte order mark)
Expand Down Expand Up @@ -1834,16 +1828,6 @@ def __init__(self, src, **kwds):

ParserBase.__init__(self, kwds)

if kwds.get("memory_map", False):
# memory-mapped files are directly handled by the TextReader.
src = stringify_path(src)

if get_compression_method(kwds.get("compression", None))[0] is not None:
raise ValueError(
"read_csv does not support compression with memory_map=True. "
+ "Please use memory_map=False instead."
)

self.handles = get_handle(
src,
mode="r",
Expand All @@ -1855,7 +1839,7 @@ def __init__(self, src, **kwds):
kwds.pop("encoding", None)
kwds.pop("memory_map", None)
kwds.pop("compression", None)
if kwds.get("memory_map", False) and hasattr(self.handles.handle, "mmap"):
if self.handles.is_mmap and hasattr(self.handles.handle, "mmap"):
self.handles.handle = self.handles.handle.mmap

# #2442
Expand Down
44 changes: 21 additions & 23 deletions pandas/tests/io/parser/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2275,40 +2275,38 @@ def test_read_csv_file_handle(all_parsers, io_class, encoding):
assert not handle.closed


def test_memory_map_compression_error(c_parser_only):
def test_memory_map_file_handle_silent_fallback(all_parsers, compression):
"""
c-parsers do not support memory_map=True with compression.
Do not fail for buffers with memory_map=True (cannot memory map BytesIO).
GH 36997
GH 37621
"""
parser = c_parser_only
df = DataFrame({"a": [1], "b": [2]})
msg = (
"read_csv does not support compression with memory_map=True. "
+ "Please use memory_map=False instead."
)
parser = all_parsers
expected = DataFrame({"a": [1], "b": [2]})

with tm.ensure_clean() as path:
df.to_csv(path, compression="gzip", index=False)
handle = BytesIO()
expected.to_csv(handle, index=False, compression=compression, mode="wb")
handle.seek(0)

with pytest.raises(ValueError, match=msg):
parser.read_csv(path, memory_map=True, compression="gzip")
tm.assert_frame_equal(
parser.read_csv(handle, memory_map=True, compression=compression),
expected,
)


def test_memory_map_file_handle(all_parsers):
def test_memory_map_compression(all_parsers, compression):
"""
Support some buffers with memory_map=True.
Support memory map for compressed files.
GH 36997
GH 37621
"""
parser = all_parsers
expected = DataFrame({"a": [1], "b": [2]})

handle = StringIO()
expected.to_csv(handle, index=False)
handle.seek(0)
with tm.ensure_clean() as path:
expected.to_csv(path, index=False, compression=compression)

tm.assert_frame_equal(
parser.read_csv(handle, memory_map=True),
expected,
)
tm.assert_frame_equal(
parser.read_csv(path, memory_map=True, compression=compression),
expected,
)

0 comments on commit 22de62c

Please sign in to comment.