Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof of concept for Copy-on-Write implementation #41878

Closed
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
462526e
POC for Copy-on-Write
jorisvandenbossche Apr 29, 2021
41ee2b7
clean-up ArrayManager.copy implementation
jorisvandenbossche Jun 8, 2021
7a8dffc
add some comments / docstrings
jorisvandenbossche Jun 8, 2021
1c964be
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Jun 25, 2021
96b6d71
fix series slice + del operator
jorisvandenbossche Jun 25, 2021
17cedb9
fix pickle and column_setitem with dtype changes
jorisvandenbossche Jun 25, 2021
f4614c2
fix setitem on single column for full slice + update frame tests
jorisvandenbossche Jun 25, 2021
81b09c2
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Jun 26, 2021
7f183de
fix ref tracking in slicing columns
jorisvandenbossche Jun 26, 2021
693bc4f
fix series setitem -> don't update parent cache
jorisvandenbossche Jun 28, 2021
a154591
update indexing tests with new behaviour to get them passing
jorisvandenbossche Jun 28, 2021
71370c4
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Jul 12, 2021
4e785d7
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Aug 4, 2021
6741340
fix new test + skip Series.update tests for now
jorisvandenbossche Aug 4, 2021
c4527e9
remove chained assignment outside indexing tests
jorisvandenbossche Aug 4, 2021
b33aaf2
fix DataFrame.fillna + more chained setitem
jorisvandenbossche Aug 4, 2021
9fdad69
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Aug 7, 2021
f5da03f
reindex/take on columns to use copy-on-write
jorisvandenbossche Aug 8, 2021
c632757
fix some typing issues
jorisvandenbossche Aug 8, 2021
e4a5f33
fix dtype in test for windows
jorisvandenbossche Aug 8, 2021
5efad50
fix dtype in test for windows
jorisvandenbossche Aug 8, 2021
8ea48cc
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Aug 9, 2021
79b7a30
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Oct 11, 2021
cf5c7e2
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Nov 11, 2021
297662a
[ArrayManager] Array version of putmask logic
jorisvandenbossche Nov 11, 2021
a011a06
fixup copy-on-write in putmask
jorisvandenbossche Nov 11, 2021
cc09001
Update BlockManager expected results after recent behaviour changes
jorisvandenbossche Nov 11, 2021
37e7ce0
limit changes to putmask for this PR
jorisvandenbossche Nov 22, 2021
e34b9b2
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Nov 22, 2021
64377e0
Merge remote-tracking branch 'upstream/master' into am-experiment-cow
jorisvandenbossche Nov 26, 2021
0f28095
address feedback
jorisvandenbossche Nov 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4913,7 +4913,7 @@ def rename(
index: Renamer | None = None,
columns: Renamer | None = None,
axis: Axis | None = None,
copy: bool = True,
copy: bool | None = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copy=None is what changes DataFrame.rename to use a shallow copy with CoW instead of doing a full copy for the ArrayManager.
This is eventually passed to self.copy(deep=copy), and deep=None is used to signal a shallow copy for ArrayManager while for now preserving the deep copy for BlockManager.

inplace: bool = False,
level: Level | None = None,
errors: str = "ignore",
Expand Down Expand Up @@ -5747,7 +5747,7 @@ class max type
if inplace:
new_obj = self
else:
new_obj = self.copy()
new_obj = self.copy(deep=None)

new_index = ibase.default_index(len(new_obj))
if level is not None:
Expand Down
6 changes: 4 additions & 2 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ def rename(
index: Renamer | None = None,
columns: Renamer | None = None,
axis: Axis | None = None,
copy: bool_t = True,
copy: bool_t | None = None,
inplace: bool_t = False,
level: Level | None = None,
errors: str = "ignore",
Expand Down Expand Up @@ -3887,6 +3887,8 @@ def _check_setitem_copy(self, stacklevel=4, t="setting", force=False):
df.iloc[0:5]['group'] = 'a'

"""
if isinstance(self._mgr, (ArrayManager, SingleArrayManager)):
return
# return early if the check is not needed
if not (force or self._is_copy):
return
Expand Down Expand Up @@ -5817,7 +5819,7 @@ def astype(
return result

@final
def copy(self: FrameOrSeries, deep: bool_t = True) -> FrameOrSeries:
def copy(self: FrameOrSeries, deep: bool_t | None = True) -> FrameOrSeries:
"""
Make a copy of this object's indices and data.

Expand Down
10 changes: 10 additions & 0 deletions pandas/core/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1894,6 +1894,16 @@ def _setitem_single_column(self, loc: int, value, plane_indexer):
"""
pi = plane_indexer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#42887 would make for a good precursor

if not hasattr(self.obj._mgr, "blocks"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be an ArrayManager test? why the different semantics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a check to know if it is an ArrayManager, without actually importing it (core/indexing.py currently doesn't import anything from internals).
I think in another PR that might not have been merged, I added an attribute on the DataFrame that returns True/False for this, which might be easier to reuse in multiple places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yah we should for sure have 1 canonical way of doing this check so we can grep for all the places we do it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> #44676

# ArrayManager
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on why this is AM-specific?

if com.is_null_slice(pi) or com.is_full_slice(pi, len(self.obj)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xref #44353 (doesn't need to be resolved for this PR, but will make lots of things easier)

arr = self.obj._sanitize_column(value)
self.obj._mgr.iset(loc, arr)
else:
self.obj._mgr.column_setitem(loc, plane_indexer, value)
self.obj._clear_item_cache()
return

ser = self.obj._ixs(loc, axis=1)

# perform the equivalent of a setitem on the info axis
Expand Down
112 changes: 104 additions & 8 deletions pandas/core/internals/array_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Callable,
TypeVar,
)
import weakref

import numpy as np

Expand Down Expand Up @@ -110,6 +111,7 @@ class BaseArrayManager(DataManager):
----------
arrays : Sequence of arrays
axes : Sequence of Index
refs : Sequence of weakrefs or None, optional
verify_integrity : bool, default True

"""
Expand All @@ -121,11 +123,13 @@ class BaseArrayManager(DataManager):

arrays: list[np.ndarray | ExtensionArray]
_axes: list[Index]
refs: list[weakref.ref | None] | None

def __init__(
self,
arrays: list[np.ndarray | ExtensionArray],
axes: list[Index],
refs: list[weakref.ref | None] | None,
verify_integrity: bool = True,
):
raise NotImplementedError
Expand Down Expand Up @@ -176,11 +180,33 @@ def is_consolidated(self) -> bool:
def _consolidate_inplace(self) -> None:
pass

def _has_no_reference(self, i: int):
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
"""
Check for column `i` if has references.
(whether it references another array or is itself being referenced)

Returns True if the columns has no references.
"""
return (self.refs is None or self.refs[i] is None) and weakref.getweakrefcount(
self.arrays[i]
) == 0

def _clear_reference(self, i: int):
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
"""
Clear any reference for column `i`.
"""
if self.refs is not None:
self.refs[i] = None

def get_dtypes(self):
return np.array([arr.dtype for arr in self.arrays], dtype="object")

# TODO setstate getstate

def __reduce__(self):
# to avoid pickling the refs
return type(self), (self.arrays, self._axes)

def __repr__(self) -> str:
output = type(self).__name__
output += f"\nIndex: {self._axes[0]}"
Expand Down Expand Up @@ -352,6 +378,7 @@ def where(self: T, other, cond, align: bool, errors: str) -> T:
# return self.apply_with_block("setitem", indexer=indexer, value=value)

def putmask(self, mask, new, align: bool = True):
# TODO(CoW) check references to copy if needed
if align:
align_keys = ["new", "mask"]
else:
Expand Down Expand Up @@ -516,6 +543,10 @@ def copy(self: T, deep=True) -> T:
-------
BlockManager
"""
if deep is None:
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
# use shallow copy
deep = False

# this preserves the notion of view copying of axes
if deep:
# hit in e.g. tests.io.json.test_pandas
Expand All @@ -529,9 +560,12 @@ def copy_func(ax):

if deep:
new_arrays = [arr.copy() for arr in self.arrays]
refs = None
else:
new_arrays = self.arrays
return type(self)(new_arrays, new_axes)
new_arrays = list(self.arrays)
refs = [weakref.ref(arr) for arr in self.arrays]

return type(self)(new_arrays, new_axes, refs, verify_integrity=False)

def reindex_indexer(
self: T,
Expand Down Expand Up @@ -599,14 +633,19 @@ def _reindex_indexer(

if axis == 1:
new_arrays = []
refs = []
for i in indexer:
if i == -1:
arr = self._make_na_array(
fill_value=fill_value, use_na_proxy=use_na_proxy
)
ref = None
else:
# reusing full column array -> track with reference
arr = self.arrays[i]
ref = weakref.ref(arr)
new_arrays.append(arr)
refs.append(ref)

else:
validate_indices(indexer, len(self._axes[0]))
Expand All @@ -625,11 +664,14 @@ def _reindex_indexer(
)
for arr in self.arrays
]
# selecting rows with take always creates a copy -> no need to
# track references to original arrays
refs = None

new_axes = list(self._axes)
new_axes[axis] = new_axis

return type(self)(new_arrays, new_axes, verify_integrity=False)
return type(self)(new_arrays, new_axes, refs, verify_integrity=False)

def take(self: T, indexer, axis: int = 1, verify: bool = True) -> T:
"""
Expand Down Expand Up @@ -693,12 +735,14 @@ def __init__(
self,
arrays: list[np.ndarray | ExtensionArray],
axes: list[Index],
refs: list[weakref.ref | None] | None = None,
verify_integrity: bool = True,
):
# Note: we are storing the axes in "_axes" in the (row, columns) order
# which contrasts the order how it is stored in BlockManager
self._axes = axes
self.arrays = arrays
self.refs = refs

if verify_integrity:
self._axes = [ensure_index(ax) for ax in axes]
Expand Down Expand Up @@ -728,6 +772,12 @@ def _verify_integrity(self) -> None:
"Passed arrays should be 1-dimensional, got array with "
f"{arr.ndim} dimensions instead."
)
if self.refs is not None:
if not len(self.refs) == n_columns:
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"Number of passed refs must equal the size of the column Index: "
f"{len(self.refs)} refs vs {n_columns} columns."
)

# --------------------------------------------------------------------
# Indexing
Expand Down Expand Up @@ -763,20 +813,27 @@ def get_slice(self, slobj: slice, axis: int = 0) -> ArrayManager:

if axis == 0:
arrays = [arr[slobj] for arr in self.arrays]
# slicing results in views -> track references to original arrays
# TODO possible to optimizate this with single ref to the full ArrayManager?
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
refs = [weakref.ref(arr) for arr in self.arrays]
elif axis == 1:
arrays = self.arrays[slobj]
# track reference to subset of column arrays
refs = [weakref.ref(arr) for arr in arrays]

new_axes = list(self._axes)
new_axes[axis] = new_axes[axis]._getitem_slice(slobj)

return type(self)(arrays, new_axes, verify_integrity=False)
return type(self)(arrays, new_axes, refs, verify_integrity=False)

def iget(self, i: int) -> SingleArrayManager:
"""
Return the data as a SingleArrayManager.
"""
values = self.arrays[i]
return SingleArrayManager([values], [self._axes[0]])
# getting single column array for Series -> track reference to original
ref = weakref.ref(values)
return SingleArrayManager([values], [self._axes[0]], [ref])

def iget_values(self, i: int) -> ArrayLike:
"""
Expand Down Expand Up @@ -804,6 +861,7 @@ def iset(self, loc: int | slice | np.ndarray, value: ArrayLike):
Positional location (already bounds checked)
value : np.ndarray or ExtensionArray
"""
# TODO clear reference for item that is being overwritten
# single column -> single integer index
if lib.is_integer(loc):

Expand Down Expand Up @@ -880,6 +938,10 @@ def insert(self, loc: int, item: Hashable, value: ArrayLike) -> None:
# TODO is this copy needed?
arrays = self.arrays.copy()
arrays.insert(loc, value)
if self.refs is not None:
# inserted `value` is already a copy, no need to track reference
# TODO can we use CoW here as well?
self.refs.insert(loc, None)

self.arrays = arrays
self._axes[1] = new_axis
Expand All @@ -893,8 +955,28 @@ def idelete(self, indexer):

self.arrays = [self.arrays[i] for i in np.nonzero(to_keep)[0]]
self._axes = [self._axes[0], self._axes[1][to_keep]]
if self.refs is not None:
self.refs = [ref for i, ref in enumerate(self.refs) if to_keep[i]]
return self

def column_setitem(self, loc: int, idx: int | slice | np.ndarray, value):
if self._has_no_reference(loc):
# if no reference -> set array (potentially) inplace
arr = self.arrays[loc]
# TODO we should try to avoid this (indexing.py::_setitem_single_column
# does a copy for the BM path as well)
arr = arr.copy()
else:
# otherwise perform Copy-on-Write and clear the reference
arr = self.arrays[loc].copy()
self._clear_reference(loc)

# create temporary SingleArrayManager without ref to use setitem implementation
mgr = SingleArrayManager([arr], [self._axes[0]])
new_mgr = mgr.setitem((idx,), value)
# update existing ArrayManager in-place
self.arrays[loc] = new_mgr.arrays[0]

# --------------------------------------------------------------------
# Array-wise Operation

Expand Down Expand Up @@ -1177,10 +1259,12 @@ def __init__(
self,
arrays: list[np.ndarray | ExtensionArray],
axes: list[Index],
refs: list[weakref.ref | None] | None = None,
verify_integrity: bool = True,
):
self._axes = axes
self.arrays = arrays
self.refs = refs

if verify_integrity:
assert len(axes) == 1
Expand Down Expand Up @@ -1265,6 +1349,7 @@ def fast_xs(self, loc: int) -> ArrayLike:
raise NotImplementedError("Use series._values[loc] instead")

def get_slice(self, slobj: slice, axis: int = 0) -> SingleArrayManager:
# TODO track reference
if axis >= self.ndim:
raise IndexError("Requested axis not found in manager")

Expand All @@ -1275,7 +1360,9 @@ def get_slice(self, slobj: slice, axis: int = 0) -> SingleArrayManager:
def getitem_mgr(self, indexer) -> SingleArrayManager:
new_array = self.array[indexer]
new_index = self.index[indexer]
return type(self)([new_array], [new_index])
# TODO in theory only need to track reference if new_array is a view
ref = weakref.ref(self.array)
return type(self)([new_array], [new_index], [ref])

def apply(self, func, **kwargs):
if callable(func):
Expand All @@ -1284,8 +1371,15 @@ def apply(self, func, **kwargs):
new_array = getattr(self.array, func)(**kwargs)
return type(self)([new_array], self._axes)

def setitem(self, indexer, value):
return self.apply_with_block("setitem", indexer=indexer, value=value)
def setitem(self, indexer, value, inplace=False):
if not self._has_no_reference(0):
# if being referenced -> perform Copy-on-Write and clear the reference
self.arrays[0] = self.arrays[0].copy()
self._clear_reference(0)
if inplace:
self.arrays[0][indexer] = value
else:
return self.apply_with_block("setitem", indexer=indexer, value=value)

def idelete(self, indexer) -> SingleArrayManager:
"""
Expand All @@ -1296,6 +1390,8 @@ def idelete(self, indexer) -> SingleArrayManager:

self.arrays = [self.arrays[0][to_keep]]
self._axes = [self._axes[0][to_keep]]
# clear reference since we are backed by new array
self.refs = None
return self

def _get_data_subset(self, predicate: Callable) -> SingleArrayManager:
Expand Down
11 changes: 9 additions & 2 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,12 @@ def where(self: T, other, cond, align: bool, errors: str) -> T:
errors=errors,
)

def setitem(self: T, indexer, value) -> T:
return self.apply("setitem", indexer=indexer, value=value)
def setitem(self: T, indexer, value, inplace=False) -> T:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow the change that was needed in the ArrayManager (to ensure every mutation of the values happens in the manager), see #41879 for this change as a separated pre-cursor PR

if inplace:
assert self.ndim == 1
self._block.values[indexer] = value
else:
return self.apply("setitem", indexer=indexer, value=value)

def putmask(self, mask, new, align: bool = True):

Expand Down Expand Up @@ -566,6 +570,9 @@ def copy(self: T, deep=True) -> T:
-------
BlockManager
"""
if deep is None:
# preserve deep copy for BlockManager with copy=None
deep = True
# this preserves the notion of view copying of axes
if deep:
# hit in e.g. tests.io.json.test_pandas
Expand Down
Loading