Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: memory_map for compressed files #37621

Merged
merged 1 commit into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ Other enhancements
- :class:`DatetimeIndex` and :class:`Series` with ``datetime64`` or ``datetime64tz`` dtypes now support ``std`` (:issue:`37436`)
- :class:`Window` now supports all Scipy window types in ``win_type`` with flexible keyword argument support (:issue:`34556`)
- :meth:`testing.assert_index_equal` now has a ``check_order`` parameter that allows indexes to be checked in an order-insensitive manner (:issue:`37478`)
- :func:`read_csv` supports memory-mapping for compressed files (:issue:`37621`)

.. _whatsnew_120.api_breaking.python:

Expand Down
135 changes: 85 additions & 50 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class IOHandles:
handle: Buffer
created_handles: List[Buffer] = dataclasses.field(default_factory=list)
is_wrapped: bool = False
is_mmap: bool = False

def close(self) -> None:
"""
Expand Down Expand Up @@ -604,49 +605,49 @@ def get_handle(
except ImportError:
pass

handles: List[Buffer] = list()

# Windows does not default to utf-8. Set to utf-8 for a consistent behavior
if encoding is None:
encoding = "utf-8"

# Convert pathlib.Path/py.path.local or string
path_or_buf = stringify_path(path_or_buf)
is_path = isinstance(path_or_buf, str)
f = path_or_buf
handle = stringify_path(path_or_buf)

compression, compression_args = get_compression_method(compression)
if is_path:
compression = infer_compression(path_or_buf, compression)
compression = infer_compression(handle, compression)

if compression:
# memory mapping needs to be the first step
handle, memory_map, handles = _maybe_memory_map(
handle, memory_map, encoding, mode, errors
)

is_path = isinstance(handle, str)
if compression:
# GZ Compression
if compression == "gzip":
if is_path:
assert isinstance(path_or_buf, str)
f = gzip.GzipFile(filename=path_or_buf, mode=mode, **compression_args)
assert isinstance(handle, str)
handle = gzip.GzipFile(filename=handle, mode=mode, **compression_args)
else:
f = gzip.GzipFile(
fileobj=path_or_buf, # type: ignore[arg-type]
handle = gzip.GzipFile(
fileobj=handle, # type: ignore[arg-type]
mode=mode,
**compression_args,
)

# BZ Compression
elif compression == "bz2":
f = bz2.BZ2File(
path_or_buf, mode=mode, **compression_args # type: ignore[arg-type]
handle = bz2.BZ2File(
handle, mode=mode, **compression_args # type: ignore[arg-type]
)

# ZIP Compression
elif compression == "zip":
f = _BytesZipFile(path_or_buf, mode, **compression_args)
if f.mode == "r":
handles.append(f)
zip_names = f.namelist()
handle = _BytesZipFile(handle, mode, **compression_args)
if handle.mode == "r":
handles.append(handle)
zip_names = handle.namelist()
if len(zip_names) == 1:
f = f.open(zip_names.pop())
handle = handle.open(zip_names.pop())
elif len(zip_names) == 0:
raise ValueError(f"Zero files found in ZIP file {path_or_buf}")
else:
Expand All @@ -657,64 +658,52 @@ def get_handle(

# XZ Compression
elif compression == "xz":
f = get_lzma_file(lzma)(path_or_buf, mode)
handle = get_lzma_file(lzma)(handle, mode)

# Unrecognized Compression
else:
msg = f"Unrecognized compression type: {compression}"
raise ValueError(msg)

assert not isinstance(f, str)
handles.append(f)
assert not isinstance(handle, str)
handles.append(handle)

elif is_path:
# Check whether the filename is to be opened in binary mode.
# Binary mode does not support 'encoding' and 'newline'.
is_binary_mode = "b" in mode
assert isinstance(path_or_buf, str)
if encoding and not is_binary_mode:
assert isinstance(handle, str)
if encoding and "b" not in mode:
# Encoding
f = open(path_or_buf, mode, encoding=encoding, errors=errors, newline="")
handle = open(handle, mode, encoding=encoding, errors=errors, newline="")
else:
# Binary mode
f = open(path_or_buf, mode)
handles.append(f)
handle = open(handle, mode)
handles.append(handle)

# Convert BytesIO or file objects passed with an encoding
is_wrapped = False
if is_text and (
compression
or isinstance(f, need_text_wrapping)
or "b" in getattr(f, "mode", "")
or isinstance(handle, need_text_wrapping)
or "b" in getattr(handle, "mode", "")
):
f = TextIOWrapper(
f, encoding=encoding, errors=errors, newline="" # type: ignore[arg-type]
handle = TextIOWrapper(
handle, # type: ignore[arg-type]
encoding=encoding,
errors=errors,
newline="",
)
handles.append(f)
handles.append(handle)
# do not mark as wrapped when the user provided a string
is_wrapped = not is_path

if memory_map and hasattr(f, "fileno"):
assert not isinstance(f, str)
try:
wrapped = cast(mmap.mmap, _MMapWrapper(f)) # type: ignore[arg-type]
f.close()
handles.remove(f)
handles.append(wrapped)
f = wrapped
except Exception:
# we catch any errors that may have occurred
# because that is consistent with the lower-level
# functionality of the C engine (pd.read_csv), so
# leave the file handler as is then
pass

handles.reverse() # close the most recently added buffer first
assert not isinstance(f, str)
assert not isinstance(handle, str)
return IOHandles(
handle=f,
handle=handle,
created_handles=handles,
is_wrapped=is_wrapped,
is_mmap=memory_map,
)


Expand Down Expand Up @@ -778,9 +767,16 @@ class _MMapWrapper(abc.Iterator):
"""

def __init__(self, f: IO):
self.attributes = {}
for attribute in ("seekable", "readable", "writeable"):
if not hasattr(f, attribute):
continue
self.attributes[attribute] = getattr(f, attribute)()
self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

def __getattr__(self, name: str):
if name in self.attributes:
return lambda: self.attributes[name]
return getattr(self.mmap, name)

def __iter__(self) -> "_MMapWrapper":
Expand All @@ -799,3 +795,42 @@ def __next__(self) -> str:
if newline == "":
raise StopIteration
return newline


def _maybe_memory_map(
handle: FileOrBuffer,
memory_map: bool,
encoding: str,
mode: str,
errors: Optional[str],
) -> Tuple[FileOrBuffer, bool, List[Buffer]]:
"""Try to use memory map file/buffer."""
handles: List[Buffer] = []
memory_map &= hasattr(handle, "fileno") or isinstance(handle, str)
if not memory_map:
return handle, memory_map, handles

# need to open the file first
if isinstance(handle, str):
if encoding and "b" not in mode:
# Encoding
handle = open(handle, mode, encoding=encoding, errors=errors, newline="")
else:
# Binary mode
handle = open(handle, mode)
handles.append(handle)

try:
wrapped = cast(mmap.mmap, _MMapWrapper(handle)) # type: ignore[arg-type]
handle.close()
handles.remove(handle)
handles.append(wrapped)
handle = wrapped
except Exception:
# we catch any errors that may have occurred
# because that is consistent with the lower-level
# functionality of the C engine (pd.read_csv), so
# leave the file handler as is then
memory_map = False

return handle, memory_map, handles
20 changes: 2 additions & 18 deletions pandas/io/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,7 @@
from pandas.core.series import Series
from pandas.core.tools import datetimes as tools

from pandas.io.common import (
get_compression_method,
get_filepath_or_buffer,
get_handle,
stringify_path,
validate_header_arg,
)
from pandas.io.common import get_filepath_or_buffer, get_handle, validate_header_arg
from pandas.io.date_converters import generic_parser

# BOM character (byte order mark)
Expand Down Expand Up @@ -1834,16 +1828,6 @@ def __init__(self, src, **kwds):

ParserBase.__init__(self, kwds)

if kwds.get("memory_map", False):
# memory-mapped files are directly handled by the TextReader.
src = stringify_path(src)

if get_compression_method(kwds.get("compression", None))[0] is not None:
raise ValueError(
"read_csv does not support compression with memory_map=True. "
+ "Please use memory_map=False instead."
)

self.handles = get_handle(
src,
mode="r",
Expand All @@ -1855,7 +1839,7 @@ def __init__(self, src, **kwds):
kwds.pop("encoding", None)
kwds.pop("memory_map", None)
kwds.pop("compression", None)
if kwds.get("memory_map", False) and hasattr(self.handles.handle, "mmap"):
if self.handles.is_mmap and hasattr(self.handles.handle, "mmap"):
self.handles.handle = self.handles.handle.mmap

# #2442
Expand Down
44 changes: 21 additions & 23 deletions pandas/tests/io/parser/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2275,40 +2275,38 @@ def test_read_csv_file_handle(all_parsers, io_class, encoding):
assert not handle.closed


def test_memory_map_compression_error(c_parser_only):
def test_memory_map_file_handle_silent_fallback(all_parsers, compression):
"""
c-parsers do not support memory_map=True with compression.
Do not fail for buffers with memory_map=True (cannot memory map BytesIO).

GH 36997
GH 37621
"""
parser = c_parser_only
df = DataFrame({"a": [1], "b": [2]})
msg = (
"read_csv does not support compression with memory_map=True. "
+ "Please use memory_map=False instead."
)
parser = all_parsers
expected = DataFrame({"a": [1], "b": [2]})

with tm.ensure_clean() as path:
df.to_csv(path, compression="gzip", index=False)
handle = BytesIO()
expected.to_csv(handle, index=False, compression=compression, mode="wb")
handle.seek(0)

with pytest.raises(ValueError, match=msg):
parser.read_csv(path, memory_map=True, compression="gzip")
tm.assert_frame_equal(
parser.read_csv(handle, memory_map=True, compression=compression),
expected,
)


def test_memory_map_file_handle(all_parsers):
def test_memory_map_compression(all_parsers, compression):
"""
Support some buffers with memory_map=True.
Support memory map for compressed files.

GH 36997
GH 37621
"""
parser = all_parsers
expected = DataFrame({"a": [1], "b": [2]})

handle = StringIO()
expected.to_csv(handle, index=False)
handle.seek(0)
with tm.ensure_clean() as path:
expected.to_csv(path, index=False, compression=compression)

tm.assert_frame_equal(
parser.read_csv(handle, memory_map=True),
expected,
)
tm.assert_frame_equal(
parser.read_csv(path, memory_map=True, compression=compression),
expected,
)