Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#2169: avoid unnecessary index access in groupby #2469

Merged
merged 2 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,49 +1621,63 @@ def has_multiindex(self, axis=0):
assert axis == 1
return isinstance(self.columns, pandas.MultiIndex)

def get_index_name(self):
def get_index_name(self, axis=0):
"""
Get index name.
Get index name of specified axis.

Parameters
----------
axis: int (default 0),
Axis to return index name on.

Returns
-------
hashable
Index name, None for MultiIndex.
"""
return self.index.name
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
return self.get_axis(axis).name

def set_index_name(self, name):
def set_index_name(self, name, axis=0):
"""
Set index name.
Set index name for the specified axis.

Parameters
----------
name: hashable
name: hashable,
New index name.
axis: int (default 0),
Axis to set name along.
"""
self.index.name = name
self.get_axis(axis).name = name

def get_index_names(self):
def get_index_names(self, axis=0):
"""
Get index names.
Get index names of specified axis.

Parameters
----------
axis: int (default 0),
Axis to return index names on.

Returns
-------
list
Index names.
"""
return self.index.names
return self.get_axis(axis).names

def set_index_names(self, names):
def set_index_names(self, names, axis=0):
"""
Set index names.
Set index names for the specified axis.
anmyachev marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
names: list
names: list,
New index names.
axis: int (default 0),
Axis to set names along.
"""
self.index.names = names
self.get_axis(axis).names = names

# DateTime methods

Expand Down
19 changes: 0 additions & 19 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2969,22 +2969,3 @@ def cat_codes(self):
return self.default_to_pandas(lambda df: df[df.columns[0]].cat.codes)

# END Cat operations

def has_multiindex(self, axis=0):
"""
Check if specified axis is indexed by MultiIndex.

Parameters
----------
axis : 0 or 1, default 0
The axis to check (0 - index, 1 - columns).

Returns
-------
bool
True if index at specified axis is MultiIndex and False otherwise.
"""
if axis == 0:
return isinstance(self.index, pandas.MultiIndex)
assert axis == 1
return isinstance(self.columns, pandas.MultiIndex)
22 changes: 14 additions & 8 deletions modin/experimental/backends/omnisci/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,17 +633,23 @@ def has_multiindex(self, axis=0):
assert axis == 1
return isinstance(self.columns, pandas.MultiIndex)

def get_index_name(self):
return self._modin_frame.get_index_name()
def get_index_name(self, axis=0):
return self.columns.name if axis else self._modin_frame.get_index_name()
anmyachev marked this conversation as resolved.
Show resolved Hide resolved

def set_index_name(self, name):
self._modin_frame = self._modin_frame.set_index_name(name)
def set_index_name(self, name, axis=0):
if axis == 0:
self._modin_frame = self._modin_frame.set_index_name(name)
else:
self.columns.name = name

def get_index_names(self):
return self._modin_frame.get_index_names()
def get_index_names(self, axis=0):
return self.columns.names if axis else self._modin_frame.get_index_names()
anmyachev marked this conversation as resolved.
Show resolved Hide resolved

def set_index_names(self, names):
self._modin_frame = self._modin_frame.set_index_names(names)
def set_index_names(self, names=None, axis=0):
if axis == 0:
self._modin_frame = self._modin_frame.set_index_names(names)
else:
self.columns.names = names

def free(self):
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,17 @@ def run_and_compare(
**kwargs
):
def run_modin(
fn, data, data2, force_lazy, force_arrow_execute, allow_subqueries, **kwargs
fn,
data,
data2,
force_lazy,
force_arrow_execute,
allow_subqueries,
constructor_kwargs,
**kwargs
):
kwargs["df1"] = pd.DataFrame(data)
kwargs["df2"] = pd.DataFrame(data2)
kwargs["df1"] = pd.DataFrame(data, **constructor_kwargs)
kwargs["df2"] = pd.DataFrame(data2, **constructor_kwargs)
kwargs["df"] = kwargs["df1"]

if force_lazy:
Expand All @@ -76,9 +83,10 @@ def run_modin(

return exp_res

constructor_kwargs = kwargs.pop("constructor_kwargs", {})
try:
kwargs["df1"] = pandas.DataFrame(data)
kwargs["df2"] = pandas.DataFrame(data2)
kwargs["df1"] = pandas.DataFrame(data, **constructor_kwargs)
kwargs["df2"] = pandas.DataFrame(data2, **constructor_kwargs)
kwargs["df"] = kwargs["df1"]
ref_res = fn(lib=pandas, **kwargs)
except Exception as e:
Expand All @@ -90,6 +98,7 @@ def run_modin(
force_lazy=force_lazy,
force_arrow_execute=force_arrow_execute,
allow_subqueries=allow_subqueries,
constructor_kwargs=constructor_kwargs,
**kwargs
)
_ = exp_res.index
Expand All @@ -101,6 +110,7 @@ def run_modin(
force_lazy=force_lazy,
force_arrow_execute=force_arrow_execute,
allow_subqueries=allow_subqueries,
constructor_kwargs=constructor_kwargs,
**kwargs
)
df_equals(ref_res, exp_res)
Expand Down Expand Up @@ -634,6 +644,15 @@ def groupby_mean(df, cols, as_index, **kwargs):

run_and_compare(groupby_mean, data=self.data, cols=cols, as_index=as_index)

def test_groupby_lazy_multiindex(self):
index = generate_multiindex(len(self.data["a"]))

def groupby(df, *args, **kwargs):
df = df + 1
return df.groupby("a").agg({"b": "size"})

run_and_compare(groupby, data=self.data, constructor_kwargs={"index": index})

taxi_data = {
"a": [1, 1, 2, 2],
"b": [11, 21, 12, 11],
Expand Down
12 changes: 4 additions & 8 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,9 @@ def groupby(
elif isinstance(by, str):
drop = by in self.columns
idx_name = by
if (
self._query_compiler.has_multiindex(axis=axis)
and by in self.axes[axis].names
or hasattr(self.axes[axis], "name")
and self.axes[axis].name == by
):
if self._query_compiler.has_multiindex(
axis=axis
) and by in self._query_compiler.get_index_names(axis):
# In this case we pass the string value of the name through to the
# partitions. This is more efficient than broadcasting the values.
pass
Expand Down Expand Up @@ -419,8 +416,7 @@ def groupby(
if mismatch and all(
isinstance(obj, str)
and (
obj in self
or (hasattr(self.index, "names") and obj in self.index.names)
obj in self or obj in self._query_compiler.get_index_names(axis)
)
for obj in by
):
Expand Down
21 changes: 11 additions & 10 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,15 @@ def _shift(periods, freq, axis, fill_value, is_set_nan_rows=True):
)
result.index = pandas.MultiIndex.from_arrays(
new_idx_lvl_arrays,
names=[col_name for col_name in self._by.columns] + [result.index.name],
names=[col_name for col_name in self._by.columns]
+ [result._query_compiler.get_index_name()],
)
result = result.dropna(subset=self._by.columns).sort_index()
else:
result = self._apply_agg_function(
lambda df: df.shift(periods, freq, axis, fill_value)
)
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def nth(self, n, dropna=None):
Expand All @@ -240,7 +241,7 @@ def nth(self, n, dropna=None):
def cumsum(self, axis=0, *args, **kwargs):
result = self._apply_agg_function(lambda df: df.cumsum(axis, *args, **kwargs))
# pandas does not name the index on cumsum
result.index.name = None
result._query_compiler.set_index_name(None)
return result

@property
Expand All @@ -258,7 +259,7 @@ def filter(self, func, dropna=True, *args, **kwargs):
def cummax(self, axis=0, **kwargs):
result = self._apply_agg_function(lambda df: df.cummax(axis, **kwargs))
# pandas does not name the index on cummax
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def apply(self, func, *args, **kwargs):
Expand Down Expand Up @@ -340,7 +341,7 @@ def __getitem__(self, key):
def cummin(self, axis=0, **kwargs):
result = self._apply_agg_function(lambda df: df.cummin(axis=axis, **kwargs))
# pandas does not name the index on cummin
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def bfill(self, limit=None):
Expand Down Expand Up @@ -438,7 +439,7 @@ def mad(self, **kwargs):
def rank(self, **kwargs):
result = self._apply_agg_function(lambda df: df.rank(**kwargs))
# pandas does not name the index on rank
result.index.name = None
result._query_compiler.set_index_name(None)
return result

@property
Expand Down Expand Up @@ -597,7 +598,7 @@ def head(self, n=5):
def cumprod(self, axis=0, *args, **kwargs):
result = self._apply_agg_function(lambda df: df.cumprod(axis, *args, **kwargs))
# pandas does not name the index on cumprod
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def __iter__(self):
Expand All @@ -611,7 +612,7 @@ def transform(self, func, *args, **kwargs):
lambda df: df.transform(func, *args, **kwargs)
)
# pandas does not name the index on transform
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def corr(self, **kwargs):
Expand All @@ -620,7 +621,7 @@ def corr(self, **kwargs):
def fillna(self, **kwargs):
result = self._apply_agg_function(lambda df: df.fillna(**kwargs))
# pandas does not name the index on fillna
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def count(self, **kwargs):
Expand All @@ -641,7 +642,7 @@ def pipe(self, func, *args, **kwargs):
def cumcount(self, ascending=True):
result = self._default_to_pandas(lambda df: df.cumcount(ascending=ascending))
# pandas does not name the index on cumcount
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def tail(self, n=5):
Expand Down