-
-
Notifications
You must be signed in to change notification settings - Fork 18k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC: ArrayManager -- array-based data manager for columnar store #36010
Changes from 36 commits
a51835b
591579b
896080a
f9c4dda
d18082a
cf3c07a
b252c6d
0fb645e
eb55fef
d241f31
47c3ee3
75f7de2
f36e395
be20816
8b7cc81
a239f50
a0ccf9a
dc1b190
55d38be
3dea0d7
1a61333
9751d33
e45b645
3749c7d
af53040
cc45673
27cf215
f6a97df
67c4c2b
670ed76
5128ad1
5c73688
a9a8c2d
c7898fb
3430307
1a30013
ef86b1e
c5548d9
afe8f80
b88c757
ddc51d0
9dc5600
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -138,13 +138,14 @@ | |
) | ||
from pandas.core.indexes.multi import MultiIndex, maybe_droplevels | ||
from pandas.core.indexing import check_bool_indexer, convert_to_index_sliceable | ||
from pandas.core.internals import BlockManager | ||
from pandas.core.internals import ArrayManager, BlockManager | ||
from pandas.core.internals.construction import ( | ||
arrays_to_mgr, | ||
dataclasses_to_dicts, | ||
init_dict, | ||
init_ndarray, | ||
masked_rec_array_to_mgr, | ||
mgr_to_mgr, | ||
nested_data_to_arrays, | ||
reorder_arrays, | ||
sanitize_index, | ||
|
@@ -524,7 +525,7 @@ def __init__( | |
if isinstance(data, DataFrame): | ||
data = data._mgr | ||
|
||
if isinstance(data, BlockManager): | ||
if isinstance(data, (BlockManager, ArrayManager)): | ||
if index is None and columns is None and dtype is None and copy is False: | ||
# GH#33357 fastpath | ||
NDFrame.__init__(self, data) | ||
|
@@ -602,8 +603,31 @@ def __init__( | |
values, index, columns, dtype=values.dtype, copy=False | ||
) | ||
|
||
# ensure correct Manager type according to settings | ||
manager = get_option("mode.data_manager") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you move this to the module level? otherwise this would be constantly checked There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, because you need to be able to change the setting after initial import. |
||
mgr = mgr_to_mgr(mgr, typ=manager) | ||
|
||
NDFrame.__init__(self, mgr) | ||
|
||
def _as_manager(self, typ: str) -> DataFrame: | ||
""" | ||
Private helper function to create a DataFrame with specific manager. | ||
|
||
Parameters | ||
---------- | ||
typ : {"block", "array"} | ||
|
||
Returns | ||
------- | ||
DataFrame | ||
New DataFrame using specified manager type. Is not guaranteed | ||
to be a copy or not. | ||
""" | ||
new_mgr: Union[BlockManager, ArrayManager] | ||
new_mgr = mgr_to_mgr(self._mgr, typ=typ) | ||
# fastpath of passing a manager doesn't check the option/manager class | ||
return DataFrame(new_mgr) | ||
|
||
# ---------------------------------------------------------------------- | ||
|
||
@property | ||
|
@@ -676,6 +700,8 @@ def _is_homogeneous_type(self) -> bool: | |
... "B": np.array([1, 2], dtype=np.int64)})._is_homogeneous_type | ||
False | ||
""" | ||
if isinstance(self._mgr, ArrayManager): | ||
return False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this different? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to return something here, as the code below relies on BlockManager internals. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add comments where appropriate to point out places where things are temporary/POC? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this specific case, simply updated it to do the actual correct thing (check the number of unique types). I think other edits are either OK or already have a TODO note |
||
if self._mgr.any_extension_types: | ||
return len({block.dtype for block in self._mgr.blocks}) == 1 | ||
else: | ||
|
@@ -686,6 +712,8 @@ def _can_fast_transpose(self) -> bool: | |
""" | ||
Can we transpose this DataFrame without creating any new array objects. | ||
""" | ||
if isinstance(self._mgr, ArrayManager): | ||
return False | ||
if self._mgr.any_extension_types: | ||
# TODO(EA2D) special case would be unnecessary with 2D EAs | ||
return False | ||
|
@@ -5507,7 +5535,7 @@ def sort_values( # type: ignore[override] | |
) | ||
|
||
if ignore_index: | ||
new_data.axes[1] = ibase.default_index(len(indexer)) | ||
new_data.set_axis(1, ibase.default_index(len(indexer))) | ||
jbrockmendel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
result = self._constructor(new_data) | ||
if inplace: | ||
|
@@ -6052,7 +6080,10 @@ def _dispatch_frame_op(self, right, func, axis: Optional[int] = None): | |
# fails in cases with empty columns reached via | ||
# _frame_arith_method_with_reindex | ||
|
||
bm = self._mgr.operate_blockwise(right._mgr, array_op) | ||
# TODO operate_blockwise expects a manager of the same type | ||
bm = self._mgr.operate_blockwise( | ||
right._mgr, array_op # type: ignore[arg-type] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are you adding the type ignore? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's related to the TODO: operate_blockwise complains about getting a Union[BlockManager, ArrayManager], while it either expects a BlockManger if it gets called from a BlockManager, or ArrayManager if it gets called from ArrayManager. That's something that needs to be fixed in general (hence the TODO), but as long as the actual issue is not solved, it's easier to add a type ignore. |
||
) | ||
return type(self)(bm) | ||
|
||
elif isinstance(right, Series) and axis == 1: | ||
|
@@ -8897,11 +8928,11 @@ def func(values: np.ndarray): | |
# We only use this in the case that operates on self.values | ||
return op(values, axis=axis, skipna=skipna, **kwds) | ||
|
||
def blk_func(values): | ||
def blk_func(values, axis=1): | ||
if isinstance(values, ExtensionArray): | ||
return values._reduce(name, skipna=skipna, **kwds) | ||
else: | ||
return op(values, axis=1, skipna=skipna, **kwds) | ||
return op(values, axis=axis, skipna=skipna, **kwds) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this stays 1 for BlockManager but becomes 0 for ArrayManager? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the default stays the same. And for BlockManager, this default is used. But this way, it allows us to override it for the ArrayManager reduce method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does axis ever actually get passed to blk_func? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, as mentioned above, the ArrayManager.reduce method uses this |
||
|
||
def _get_data() -> DataFrame: | ||
if filter_type is None: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,7 +103,7 @@ | |
RangeIndex, | ||
ensure_index, | ||
) | ||
from pandas.core.internals import BlockManager | ||
from pandas.core.internals import ArrayManager, BlockManager | ||
from pandas.core.missing import find_valid_index | ||
from pandas.core.ops import align_method_FRAME | ||
from pandas.core.shared_docs import _shared_docs | ||
|
@@ -184,7 +184,7 @@ class NDFrame(PandasObject, SelectionMixin, indexing.IndexingMixin): | |
) | ||
_metadata: List[str] = [] | ||
_is_copy = None | ||
_mgr: BlockManager | ||
_mgr: Union[BlockManager, ArrayManager] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should make this an actual type in _typing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pls do this |
||
_attrs: Dict[Optional[Hashable], Any] | ||
_typ: str | ||
|
||
|
@@ -193,7 +193,7 @@ class NDFrame(PandasObject, SelectionMixin, indexing.IndexingMixin): | |
|
||
def __init__( | ||
self, | ||
data: BlockManager, | ||
data: Union[BlockManager, ArrayManager], | ||
copy: bool = False, | ||
attrs: Optional[Mapping[Optional[Hashable], Any]] = None, | ||
): | ||
|
@@ -210,7 +210,9 @@ def __init__( | |
object.__setattr__(self, "_flags", Flags(self, allows_duplicate_labels=True)) | ||
|
||
@classmethod | ||
def _init_mgr(cls, mgr, axes, dtype=None, copy: bool = False) -> BlockManager: | ||
def _init_mgr( | ||
cls, mgr, axes, dtype=None, copy: bool = False | ||
) -> Union[BlockManager, ArrayManager]: | ||
""" passed a manager and a axes dict """ | ||
for a, axe in axes.items(): | ||
if axe is not None: | ||
|
@@ -223,7 +225,13 @@ def _init_mgr(cls, mgr, axes, dtype=None, copy: bool = False) -> BlockManager: | |
mgr = mgr.copy() | ||
if dtype is not None: | ||
# avoid further copies if we can | ||
if len(mgr.blocks) > 1 or mgr.blocks[0].values.dtype != dtype: | ||
if ( | ||
isinstance(mgr, BlockManager) | ||
and len(mgr.blocks) == 1 | ||
and mgr.blocks[0].values.dtype == dtype | ||
): | ||
pass | ||
else: | ||
mgr = mgr.astype(dtype=dtype) | ||
return mgr | ||
|
||
|
@@ -4547,11 +4555,11 @@ def sort_index( | |
new_data = self._mgr.take(indexer, axis=baxis, verify=False) | ||
|
||
# reconstruct axis if needed | ||
new_data.axes[baxis] = new_data.axes[baxis]._sort_levels_monotonic() | ||
new_data.set_axis(baxis, new_data.axes[baxis]._sort_levels_monotonic()) | ||
|
||
if ignore_index: | ||
axis = 1 if isinstance(self, ABCDataFrame) else 0 | ||
new_data.axes[axis] = ibase.default_index(len(indexer)) | ||
new_data.set_axis(axis, ibase.default_index(len(indexer))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you comment on why these need to be changed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In hindsight, it's probably not needed anymore. Originally, I changed the So with that change, I could revert those edits (I think, but can check that) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checked this again, and so this change is still needed (without it I get failures for the array manager). But based on a quick check, this seems to be the only place where we assign to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for explaining. we should make another attempt to get axes out of FooManager before too long |
||
|
||
result = self._constructor(new_data) | ||
|
||
|
@@ -5524,6 +5532,8 @@ def _protect_consolidate(self, f): | |
Consolidate _mgr -- if the blocks have changed, then clear the | ||
cache | ||
""" | ||
if isinstance(self._mgr, ArrayManager): | ||
return f() | ||
blocks_before = len(self._mgr.blocks) | ||
result = f() | ||
if len(self._mgr.blocks) != blocks_before: | ||
|
@@ -5713,11 +5723,13 @@ def _to_dict_of_blocks(self, copy: bool_t = True): | |
Return a dict of dtype -> Constructor Types that | ||
each is a homogeneous dtype. | ||
|
||
Internal ONLY | ||
Internal ONLY - only works for BlockManager | ||
""" | ||
mgr = self._mgr | ||
mgr = cast(BlockManager, mgr) | ||
return { | ||
k: self._constructor(v).__finalize__(self) | ||
for k, v, in self._mgr.to_dict(copy=copy).items() | ||
for k, v, in mgr.to_dict(copy=copy).items() | ||
} | ||
|
||
def astype( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these have a common base class?