Skip to content

Commit

Permalink
FEAT-#2520: add most important operations for asv benchmarks (#2539)
Browse files Browse the repository at this point in the history
* FEAT-#2520: add most important operations for asv benchmarks

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>

* FEAT-#2520: add groupby microbenchmarks

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>

* FEAT-#2520: address review comments

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Dec 16, 2020
1 parent f98a7b3 commit d13fe74
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 100 deletions.
241 changes: 141 additions & 100 deletions asv_bench/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,159 +11,198 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

# define `MODIN_CPUS` env var to control the number of partitions
# it should be defined before modin.pandas import (in case of using os.environ)

# define `MODIN_ASV_USE_IMPL` env var to choose library for using in performance
# measurements

import modin.pandas as pd
import numpy as np
import pandas

from modin.config import CpuCount, TestDatasetSize
from modin.config import TestDatasetSize, AsvImplementation
from .utils import generate_dataframe, RAND_LOW, RAND_HIGH, random_string

# define `MODIN_CPUS` env var to control the number of partitions
# it should be defined before modin.pandas import
pd.DEFAULT_NPARTITIONS = CpuCount.get()
ASV_USE_IMPL = AsvImplementation.get()

if TestDatasetSize.get() == "Big":
MERGE_DATA_SIZE = [
BINARY_OP_DATA_SIZE = [
(5000, 5000, 5000, 5000),
(10, 1_000_000, 10, 1_000_000),
(1_000_000, 10, 1_000_000, 10),
# the case extremely inefficient
# (20, 500_000, 10, 1_000_000),
(500_000, 20, 1_000_000, 10),
]
GROUPBY_DATA_SIZE = [
UNARY_OP_DATA_SIZE = [
(5000, 5000),
(10, 1_000_000),
# the case extremely inefficient
# (10, 1_000_000),
(1_000_000, 10),
]
DATA_SIZE = [(50_000, 128)]
else:
MERGE_DATA_SIZE = [
(2000, 100, 2000, 100),
BINARY_OP_DATA_SIZE = [
(256, 256, 256, 256),
(20, 10_000, 10, 25_000),
(10_000, 20, 25_000, 10),
]
GROUPBY_DATA_SIZE = [
(2000, 100),
UNARY_OP_DATA_SIZE = [
(256, 256),
(10, 10_000),
(10_000, 10),
]
DATA_SIZE = [(10_000, 128)]

JOIN_DATA_SIZE = MERGE_DATA_SIZE
ARITHMETIC_DATA_SIZE = GROUPBY_DATA_SIZE

CONCAT_DATA_SIZE = [(10_128, 100, 10_000, 128)]
def trigger_execution(func):
def real_executor(*arg, **kwargs):
return func(*arg, **kwargs).shape

return real_executor

class TimeGroupBy:
param_names = ["impl", "data_type", "data_size"]

class TimeMultiColumnGroupby:
param_names = ["data_size", "count_columns"]
params = [UNARY_OP_DATA_SIZE, [6]]

def setup(self, data_size, count_columns):
self.df = generate_dataframe(
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.groupby_columns = [col for col in self.df.columns[:count_columns]]

@trigger_execution
def time_groupby_agg_quan(self, data_size, count_columns):
return self.df.groupby(by=self.groupby_columns).agg("quantile")

@trigger_execution
def time_groupby_agg_mean(self, data_size, count_columns):
return self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean())


class TimeGroupByDefaultAggregations:
param_names = ["data_size"]
params = [
["modin", "pandas"],
["int"],
GROUPBY_DATA_SIZE,
UNARY_OP_DATA_SIZE,
]

def setup(self, impl, data_type, data_size):
def setup(self, data_size):
self.df = generate_dataframe(
impl, data_type, data_size[0], data_size[1], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.groupby_column = self.df.columns[0]

def time_groupby_sum(self, impl, data_type, data_size):
self.df.groupby(by=self.df.columns[0]).sum()
@trigger_execution
def time_groupby_count(self, data_size):
return self.df.groupby(by=self.groupby_column).count()

def time_groupby_mean(self, impl, data_type, data_size):
self.df.groupby(by=self.df.columns[0]).mean()
@trigger_execution
def time_groupby_size(self, data_size):
return self.df.groupby(by=self.groupby_column).size()

def time_groupby_count(self, impl, data_type, data_size):
self.df.groupby(by=self.df.columns[0]).count()
@trigger_execution
def time_groupby_sum(self, data_size):
return self.df.groupby(by=self.groupby_column).sum()

@trigger_execution
def time_groupby_mean(self, data_size):
return self.df.groupby(by=self.groupby_column).mean()


class TimeJoin:
param_names = ["impl", "data_type", "data_size", "how", "sort"]
param_names = ["data_size", "how", "sort"]
params = [
["modin", "pandas"],
["int"],
JOIN_DATA_SIZE,
["left", "right", "outer", "inner"],
[False, True],
BINARY_OP_DATA_SIZE,
["left", "inner"],
[False],
]

def setup(self, impl, data_type, data_size, how, sort):
def setup(self, data_size, how, sort):
self.df1 = generate_dataframe(
impl, data_type, data_size[0], data_size[1], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.df2 = generate_dataframe(
impl, data_type, data_size[2], data_size[3], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH
)

def time_join(self, impl, data_type, data_size, how, sort):
self.df1.join(
@trigger_execution
def time_join(self, data_size, how, sort):
return self.df1.join(
self.df2, on=self.df1.columns[0], how=how, lsuffix="left_", sort=sort
)


class TimeMerge:
param_names = ["impl", "data_type", "data_size", "how", "sort"]
param_names = ["data_size", "how", "sort"]
params = [
["modin", "pandas"],
["int"],
MERGE_DATA_SIZE,
["left", "right", "outer", "inner"],
[False, True],
BINARY_OP_DATA_SIZE,
["left", "inner"],
[False],
]

def setup(self, impl, data_type, data_size, how, sort):
def setup(self, data_size, how, sort):
self.df1 = generate_dataframe(
impl, data_type, data_size[0], data_size[1], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.df2 = generate_dataframe(
impl, data_type, data_size[2], data_size[3], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH
)

def time_merge(self, impl, data_type, data_size, how, sort):
self.df1.merge(self.df2, on=self.df1.columns[0], how=how, sort=sort)
@trigger_execution
def time_merge(self, data_size, how, sort):
return self.df1.merge(self.df2, on=self.df1.columns[0], how=how, sort=sort)


class TimeConcat:
param_names = ["data_type", "data_size", "how", "axis"]
param_names = ["data_size", "how", "axis"]
params = [
["int"],
CONCAT_DATA_SIZE,
BINARY_OP_DATA_SIZE,
["inner"],
[0, 1],
]

def setup(self, data_type, data_size, how, axis):
def setup(self, data_size, how, axis):
# shape for generate_dataframe: first - ncols, second - nrows
self.df1 = generate_dataframe(
"modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.df2 = generate_dataframe(
"modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH
)

def time_concat(self, data_type, data_size, how, axis):
pd.concat([self.df1, self.df2], axis=axis, join=how)
@trigger_execution
def time_concat(self, data_size, how, axis):
if ASV_USE_IMPL == "modin":
return pd.concat([self.df1, self.df2], axis=axis, join=how)
elif ASV_USE_IMPL == "pandas":
return pandas.concat([self.df1, self.df2], axis=axis, join=how)
else:
raise NotImplementedError


class TimeBinaryOp:
param_names = ["data_type", "data_size", "binary_op", "axis"]
param_names = ["data_size", "binary_op", "axis"]
params = [
["int"],
CONCAT_DATA_SIZE,
BINARY_OP_DATA_SIZE,
["mul"],
[0, 1],
]

def setup(self, data_type, data_size, binary_op, axis):
def setup(self, data_size, binary_op, axis):
# shape for generate_dataframe: first - ncols, second - nrows
self.df1 = generate_dataframe(
"modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.df2 = generate_dataframe(
"modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH
)
self.op = getattr(self.df1, binary_op)

def time_concat(self, data_type, data_size, binary_op, axis):
self.op(self.df2, axis=axis)
@trigger_execution
def time_binary_op(self, data_size, binary_op, axis):
return self.op(self.df2, axis=axis)


class BaseTimeSetItem:
param_names = ["data_type", "data_size", "item_length", "loc", "is_equal_indices"]
param_names = ["data_size", "item_length", "loc", "is_equal_indices"]

@staticmethod
def get_loc(df, loc, axis, item_length):
Expand All @@ -182,12 +221,9 @@ def get_loc(df, loc, axis, item_length):
else (df.axes[axis][range_based_loc], range_based_loc)
)

def trigger_execution(self):
self.df.shape

def setup(self, data_type, data_size, item_length, loc, is_equal_indices):
def setup(self, data_size, item_length, loc, is_equal_indices):
self.df = generate_dataframe(
"modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
).copy()
self.loc, self.iloc = self.get_loc(
self.df, loc, item_length=item_length, axis=1
Expand All @@ -201,65 +237,70 @@ def setup(self, data_type, data_size, item_length, loc, is_equal_indices):

class TimeSetItem(BaseTimeSetItem):
params = [
["int"],
DATA_SIZE,
UNARY_OP_DATA_SIZE,
[1],
["zero", "middle", "last"],
[True, False],
]

@trigger_execution
def time_setitem_qc(self, *args, **kwargs):
self.df[self.loc] = self.item
self.trigger_execution()
return self.df

@trigger_execution
def time_setitem_raw(self, *args, **kwargs):
self.df[self.loc] = self.item_raw
self.trigger_execution()
return self.df


class TimeInsert(BaseTimeSetItem):
params = [
["int"],
DATA_SIZE,
UNARY_OP_DATA_SIZE,
[1],
["zero", "middle", "last"],
[True, False],
]

@trigger_execution
def time_insert_qc(self, *args, **kwargs):
self.df.insert(loc=self.iloc, column=random_string(), value=self.item)
self.trigger_execution()
return self.df

@trigger_execution
def time_insert_raw(self, *args, **kwargs):
self.df.insert(loc=self.iloc, column=random_string(), value=self.item_raw)
self.trigger_execution()
return self.df


class TimeArithmetic:
param_names = ["impl", "data_type", "data_size", "axis"]
param_names = ["data_size", "axis"]
params = [
["modin", "pandas"],
["int"],
ARITHMETIC_DATA_SIZE,
UNARY_OP_DATA_SIZE,
[0, 1],
]

def setup(self, impl, data_type, data_size, axis):
def setup(self, data_size, axis):
self.df = generate_dataframe(
impl, data_type, data_size[0], data_size[1], RAND_LOW, RAND_HIGH
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)

def time_sum(self, impl, data_type, data_size, axis):
self.df.sum(axis=axis)
@trigger_execution
def time_sum(self, data_size, axis):
return self.df.sum(axis=axis)

@trigger_execution
def time_median(self, data_size, axis):
return self.df.median(axis=axis)

def time_median(self, impl, data_type, data_size, axis):
self.df.median(axis=axis)
@trigger_execution
def time_nunique(self, data_size, axis):
return self.df.nunique(axis=axis)

def time_nunique(self, impl, data_type, data_size, axis):
self.df.nunique(axis=axis)
@trigger_execution
def time_apply(self, data_size, axis):
return self.df.apply(lambda df: df.sum(), axis=axis)

def time_apply(self, impl, data_type, data_size, axis):
self.df.apply(lambda df: df.sum(), axis=axis)

def time_mean(self, impl, data_type, data_size, axis):
self.df.mean(axis=axis)
@trigger_execution
def time_mean(self, data_size, axis):
return self.df.mean(axis=axis)
11 changes: 11 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@ class TrackFileLeaks(EnvironmentVariable, type=bool):
default = sys.platform != "win32"


class AsvImplementation(EnvironmentVariable, type=ExactStr):
"""
Allows to select a library that we will use for testing performance.
"""

varname = "MODIN_ASV_USE_IMPL"
choices = ("modin", "pandas")

default = "modin"


def _check_vars():
"""
Look out for any environment variables that start with "MODIN_" prefix
Expand Down

0 comments on commit d13fe74

Please sign in to comment.