Skip to content

Commit

Permalink
POC: ArrayManager -- array-based data manager for columnar store (pan…
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisvandenbossche authored and luckyvs1 committed Jan 20, 2021
1 parent 08d83e9 commit 618ac6d
Show file tree
Hide file tree
Showing 59 changed files with 1,389 additions and 72 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,22 @@ jobs:
- name: Upload dev docs
run: rsync -az --delete doc/build/html/ docs@${{ secrets.server_ip }}:/usr/share/nginx/pandas/pandas-docs/dev
if: github.event_name == 'push'

data_manager:
name: Test experimental data manager
runs-on: ubuntu-latest
steps:

- name: Setting conda path
run: echo "${HOME}/miniconda3/bin" >> $GITHUB_PATH

- name: Checkout
uses: actions/checkout@v1

- name: Setup environment and build pandas
run: ci/setup_env.sh

- name: Run tests
run: |
source activate pandas-dev
pytest pandas/tests/frame/methods --array-manager
4 changes: 4 additions & 0 deletions pandas/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from pandas.core.generic import NDFrame # noqa: F401
from pandas.core.groupby.generic import DataFrameGroupBy, SeriesGroupBy
from pandas.core.indexes.base import Index
from pandas.core.internals import ArrayManager, BlockManager
from pandas.core.resample import Resampler
from pandas.core.series import Series
from pandas.core.window.rolling import BaseWindow
Expand Down Expand Up @@ -159,3 +160,6 @@
ColspaceArgType = Union[
str, int, Sequence[Union[str, int]], Mapping[Hashable, Union[str, int]]
]

# internals
Manager = Union["ArrayManager", "BlockManager"]
21 changes: 21 additions & 0 deletions pandas/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ def pytest_addoption(parser):
action="store_true",
help="Fail if a test is skipped for missing data file.",
)
parser.addoption(
"--array-manager",
"--am",
action="store_true",
help="Use the experimental ArrayManager as default data manager.",
)


def pytest_sessionstart(session):
# Note: we need to set the option here and not in pytest_runtest_setup below
# to ensure this is run before creating fixture data
if session.config.getoption("--array-manager"):
pd.options.mode.data_manager = "array"


def pytest_runtest_setup(item):
Expand Down Expand Up @@ -1454,3 +1467,11 @@ def indexer_si(request):
Parametrize over __setitem__, iloc.__setitem__
"""
return request.param


@pytest.fixture
def using_array_manager(request):
"""
Fixture to check if the array manager is being used.
"""
return pd.options.mode.data_manager == "array"
6 changes: 6 additions & 0 deletions pandas/core/config_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,12 @@ def use_inf_as_na_cb(key):
cf.register_option(
"use_inf_as_null", False, use_inf_as_null_doc, cb=use_inf_as_na_cb
)
cf.register_option(
"data_manager",
"block",
"Internal data manager type",
validator=is_one_of_factory(["block", "array"]),
)

cf.deprecate_option(
"mode.use_inf_as_null", msg=use_inf_as_null_doc, rkey="mode.use_inf_as_na"
Expand Down
44 changes: 38 additions & 6 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
IndexKeyFunc,
IndexLabel,
Level,
Manager,
PythonFuncType,
Renamer,
StorageOptions,
Expand Down Expand Up @@ -137,13 +138,14 @@
)
from pandas.core.indexes.multi import MultiIndex, maybe_droplevels
from pandas.core.indexing import check_bool_indexer, convert_to_index_sliceable
from pandas.core.internals import BlockManager
from pandas.core.internals import ArrayManager, BlockManager
from pandas.core.internals.construction import (
arrays_to_mgr,
dataclasses_to_dicts,
init_dict,
init_ndarray,
masked_rec_array_to_mgr,
mgr_to_mgr,
nested_data_to_arrays,
reorder_arrays,
sanitize_index,
Expand Down Expand Up @@ -523,7 +525,7 @@ def __init__(
if isinstance(data, DataFrame):
data = data._mgr

if isinstance(data, BlockManager):
if isinstance(data, (BlockManager, ArrayManager)):
if index is None and columns is None and dtype is None and copy is False:
# GH#33357 fastpath
NDFrame.__init__(self, data)
Expand Down Expand Up @@ -601,8 +603,31 @@ def __init__(
values, index, columns, dtype=values.dtype, copy=False
)

# ensure correct Manager type according to settings
manager = get_option("mode.data_manager")
mgr = mgr_to_mgr(mgr, typ=manager)

NDFrame.__init__(self, mgr)

def _as_manager(self, typ: str) -> DataFrame:
"""
Private helper function to create a DataFrame with specific manager.
Parameters
----------
typ : {"block", "array"}
Returns
-------
DataFrame
New DataFrame using specified manager type. Is not guaranteed
to be a copy or not.
"""
new_mgr: Manager
new_mgr = mgr_to_mgr(self._mgr, typ=typ)
# fastpath of passing a manager doesn't check the option/manager class
return DataFrame(new_mgr)

# ----------------------------------------------------------------------

@property
Expand Down Expand Up @@ -675,6 +700,8 @@ def _is_homogeneous_type(self) -> bool:
... "B": np.array([1, 2], dtype=np.int64)})._is_homogeneous_type
False
"""
if isinstance(self._mgr, ArrayManager):
return len({arr.dtype for arr in self._mgr.arrays}) == 1
if self._mgr.any_extension_types:
return len({block.dtype for block in self._mgr.blocks}) == 1
else:
Expand All @@ -685,6 +712,8 @@ def _can_fast_transpose(self) -> bool:
"""
Can we transpose this DataFrame without creating any new array objects.
"""
if isinstance(self._mgr, ArrayManager):
return False
if self._mgr.any_extension_types:
# TODO(EA2D) special case would be unnecessary with 2D EAs
return False
Expand Down Expand Up @@ -5506,7 +5535,7 @@ def sort_values( # type: ignore[override]
)

if ignore_index:
new_data.axes[1] = ibase.default_index(len(indexer))
new_data.set_axis(1, ibase.default_index(len(indexer)))

result = self._constructor(new_data)
if inplace:
Expand Down Expand Up @@ -6051,7 +6080,10 @@ def _dispatch_frame_op(self, right, func, axis: Optional[int] = None):
# fails in cases with empty columns reached via
# _frame_arith_method_with_reindex

bm = self._mgr.operate_blockwise(right._mgr, array_op)
# TODO operate_blockwise expects a manager of the same type
bm = self._mgr.operate_blockwise(
right._mgr, array_op # type: ignore[arg-type]
)
return type(self)(bm)

elif isinstance(right, Series) and axis == 1:
Expand Down Expand Up @@ -8894,11 +8926,11 @@ def func(values: np.ndarray):
# We only use this in the case that operates on self.values
return op(values, axis=axis, skipna=skipna, **kwds)

def blk_func(values):
def blk_func(values, axis=1):
if isinstance(values, ExtensionArray):
return values._reduce(name, skipna=skipna, **kwds)
else:
return op(values, axis=1, skipna=skipna, **kwds)
return op(values, axis=axis, skipna=skipna, **kwds)

def _get_data() -> DataFrame:
if filter_type is None:
Expand Down
29 changes: 20 additions & 9 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
IndexLabel,
JSONSerializable,
Level,
Manager,
NpDtype,
Renamer,
StorageOptions,
Expand Down Expand Up @@ -102,7 +103,7 @@
RangeIndex,
ensure_index,
)
from pandas.core.internals import BlockManager
from pandas.core.internals import ArrayManager, BlockManager
from pandas.core.missing import find_valid_index
from pandas.core.ops import align_method_FRAME
from pandas.core.shared_docs import _shared_docs
Expand Down Expand Up @@ -179,7 +180,7 @@ class NDFrame(PandasObject, SelectionMixin, indexing.IndexingMixin):
)
_metadata: List[str] = []
_is_copy = None
_mgr: BlockManager
_mgr: Manager
_attrs: Dict[Optional[Hashable], Any]
_typ: str

Expand All @@ -188,7 +189,7 @@ class NDFrame(PandasObject, SelectionMixin, indexing.IndexingMixin):

def __init__(
self,
data: BlockManager,
data: Manager,
copy: bool = False,
attrs: Optional[Mapping[Optional[Hashable], Any]] = None,
):
Expand All @@ -207,7 +208,7 @@ def __init__(
@classmethod
def _init_mgr(
cls, mgr, axes, dtype: Optional[Dtype] = None, copy: bool = False
) -> BlockManager:
) -> Manager:
""" passed a manager and a axes dict """
for a, axe in axes.items():
if axe is not None:
Expand All @@ -220,7 +221,13 @@ def _init_mgr(
mgr = mgr.copy()
if dtype is not None:
# avoid further copies if we can
if len(mgr.blocks) > 1 or mgr.blocks[0].values.dtype != dtype:
if (
isinstance(mgr, BlockManager)
and len(mgr.blocks) == 1
and mgr.blocks[0].values.dtype == dtype
):
pass
else:
mgr = mgr.astype(dtype=dtype)
return mgr

Expand Down Expand Up @@ -4544,11 +4551,11 @@ def sort_index(
new_data = self._mgr.take(indexer, axis=baxis, verify=False)

# reconstruct axis if needed
new_data.axes[baxis] = new_data.axes[baxis]._sort_levels_monotonic()
new_data.set_axis(baxis, new_data.axes[baxis]._sort_levels_monotonic())

if ignore_index:
axis = 1 if isinstance(self, ABCDataFrame) else 0
new_data.axes[axis] = ibase.default_index(len(indexer))
new_data.set_axis(axis, ibase.default_index(len(indexer)))

result = self._constructor(new_data)

Expand Down Expand Up @@ -5521,6 +5528,8 @@ def _protect_consolidate(self, f):
Consolidate _mgr -- if the blocks have changed, then clear the
cache
"""
if isinstance(self._mgr, ArrayManager):
return f()
blocks_before = len(self._mgr.blocks)
result = f()
if len(self._mgr.blocks) != blocks_before:
Expand Down Expand Up @@ -5710,11 +5719,13 @@ def _to_dict_of_blocks(self, copy: bool_t = True):
Return a dict of dtype -> Constructor Types that
each is a homogeneous dtype.
Internal ONLY
Internal ONLY - only works for BlockManager
"""
mgr = self._mgr
mgr = cast(BlockManager, mgr)
return {
k: self._constructor(v).__finalize__(self)
for k, v, in self._mgr.to_dict(copy=copy).items()
for k, v, in mgr.to_dict(copy=copy).items()
}

def astype(
Expand Down
6 changes: 4 additions & 2 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,10 +1086,12 @@ def py_fallback(bvalues: ArrayLike) -> ArrayLike:
# in the operation. We un-split here.
result = result._consolidate()
assert isinstance(result, (Series, DataFrame)) # for mypy
assert len(result._mgr.blocks) == 1
mgr = result._mgr
assert isinstance(mgr, BlockManager)
assert len(mgr.blocks) == 1

# unwrap DataFrame to get array
result = result._mgr.blocks[0].values
result = mgr.blocks[0].values
return result

def blk_func(bvalues: ArrayLike) -> ArrayLike:
Expand Down
4 changes: 4 additions & 0 deletions pandas/core/internals/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pandas.core.internals.array_manager import ArrayManager
from pandas.core.internals.base import DataManager
from pandas.core.internals.blocks import ( # io.pytables, io.packers
Block,
BoolBlock,
Expand Down Expand Up @@ -35,6 +37,8 @@
"TimeDeltaBlock",
"safe_reshape",
"make_block",
"DataManager",
"ArrayManager",
"BlockManager",
"SingleBlockManager",
"concatenate_block_managers",
Expand Down
Loading

0 comments on commit 618ac6d

Please sign in to comment.