From d13fe74d452af2dcefa762a4b5e76f26157a568d Mon Sep 17 00:00:00 2001 From: Anatoly Myachev <45976948+anmyachev@users.noreply.github.com> Date: Wed, 16 Dec 2020 22:06:38 +0300 Subject: [PATCH] FEAT-#2520: add most important operations for asv benchmarks (#2539) * FEAT-#2520: add most important operations for asv benchmarks Signed-off-by: Anatoly Myachev * FEAT-#2520: add groupby microbenchmarks Signed-off-by: Anatoly Myachev * FEAT-#2520: address review comments Signed-off-by: Anatoly Myachev --- asv_bench/benchmarks/benchmarks.py | 241 +++++++++++++++++------------ modin/config/envvars.py | 11 ++ 2 files changed, 152 insertions(+), 100 deletions(-) diff --git a/asv_bench/benchmarks/benchmarks.py b/asv_bench/benchmarks/benchmarks.py index e1026427eeb..281b18ca924 100644 --- a/asv_bench/benchmarks/benchmarks.py +++ b/asv_bench/benchmarks/benchmarks.py @@ -11,159 +11,198 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. +# define `MODIN_CPUS` env var to control the number of partitions +# it should be defined before modin.pandas import (in case of using os.environ) + +# define `MODIN_ASV_USE_IMPL` env var to choose library for using in performance +# measurements + import modin.pandas as pd import numpy as np +import pandas -from modin.config import CpuCount, TestDatasetSize +from modin.config import TestDatasetSize, AsvImplementation from .utils import generate_dataframe, RAND_LOW, RAND_HIGH, random_string -# define `MODIN_CPUS` env var to control the number of partitions -# it should be defined before modin.pandas import -pd.DEFAULT_NPARTITIONS = CpuCount.get() +ASV_USE_IMPL = AsvImplementation.get() if TestDatasetSize.get() == "Big": - MERGE_DATA_SIZE = [ + BINARY_OP_DATA_SIZE = [ (5000, 5000, 5000, 5000), - (10, 1_000_000, 10, 1_000_000), - (1_000_000, 10, 1_000_000, 10), + # the case extremely inefficient + # (20, 500_000, 10, 1_000_000), + (500_000, 20, 1_000_000, 10), ] - GROUPBY_DATA_SIZE = [ + UNARY_OP_DATA_SIZE = [ (5000, 5000), - (10, 1_000_000), + # the case extremely inefficient + # (10, 1_000_000), (1_000_000, 10), ] - DATA_SIZE = [(50_000, 128)] else: - MERGE_DATA_SIZE = [ - (2000, 100, 2000, 100), + BINARY_OP_DATA_SIZE = [ + (256, 256, 256, 256), + (20, 10_000, 10, 25_000), + (10_000, 20, 25_000, 10), ] - GROUPBY_DATA_SIZE = [ - (2000, 100), + UNARY_OP_DATA_SIZE = [ + (256, 256), + (10, 10_000), + (10_000, 10), ] - DATA_SIZE = [(10_000, 128)] -JOIN_DATA_SIZE = MERGE_DATA_SIZE -ARITHMETIC_DATA_SIZE = GROUPBY_DATA_SIZE -CONCAT_DATA_SIZE = [(10_128, 100, 10_000, 128)] +def trigger_execution(func): + def real_executor(*arg, **kwargs): + return func(*arg, **kwargs).shape + return real_executor -class TimeGroupBy: - param_names = ["impl", "data_type", "data_size"] + +class TimeMultiColumnGroupby: + param_names = ["data_size", "count_columns"] + params = [UNARY_OP_DATA_SIZE, [6]] + + def setup(self, data_size, count_columns): + self.df = generate_dataframe( + ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ) + self.groupby_columns = [col for col in self.df.columns[:count_columns]] + + @trigger_execution + def time_groupby_agg_quan(self, data_size, count_columns): + return self.df.groupby(by=self.groupby_columns).agg("quantile") + + @trigger_execution + def time_groupby_agg_mean(self, data_size, count_columns): + return self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean()) + + +class TimeGroupByDefaultAggregations: + param_names = ["data_size"] params = [ - ["modin", "pandas"], - ["int"], - GROUPBY_DATA_SIZE, + UNARY_OP_DATA_SIZE, ] - def setup(self, impl, data_type, data_size): + def setup(self, data_size): self.df = generate_dataframe( - impl, data_type, data_size[0], data_size[1], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) + self.groupby_column = self.df.columns[0] - def time_groupby_sum(self, impl, data_type, data_size): - self.df.groupby(by=self.df.columns[0]).sum() + @trigger_execution + def time_groupby_count(self, data_size): + return self.df.groupby(by=self.groupby_column).count() - def time_groupby_mean(self, impl, data_type, data_size): - self.df.groupby(by=self.df.columns[0]).mean() + @trigger_execution + def time_groupby_size(self, data_size): + return self.df.groupby(by=self.groupby_column).size() - def time_groupby_count(self, impl, data_type, data_size): - self.df.groupby(by=self.df.columns[0]).count() + @trigger_execution + def time_groupby_sum(self, data_size): + return self.df.groupby(by=self.groupby_column).sum() + + @trigger_execution + def time_groupby_mean(self, data_size): + return self.df.groupby(by=self.groupby_column).mean() class TimeJoin: - param_names = ["impl", "data_type", "data_size", "how", "sort"] + param_names = ["data_size", "how", "sort"] params = [ - ["modin", "pandas"], - ["int"], - JOIN_DATA_SIZE, - ["left", "right", "outer", "inner"], - [False, True], + BINARY_OP_DATA_SIZE, + ["left", "inner"], + [False], ] - def setup(self, impl, data_type, data_size, how, sort): + def setup(self, data_size, how, sort): self.df1 = generate_dataframe( - impl, data_type, data_size[0], data_size[1], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) self.df2 = generate_dataframe( - impl, data_type, data_size[2], data_size[3], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH ) - def time_join(self, impl, data_type, data_size, how, sort): - self.df1.join( + @trigger_execution + def time_join(self, data_size, how, sort): + return self.df1.join( self.df2, on=self.df1.columns[0], how=how, lsuffix="left_", sort=sort ) class TimeMerge: - param_names = ["impl", "data_type", "data_size", "how", "sort"] + param_names = ["data_size", "how", "sort"] params = [ - ["modin", "pandas"], - ["int"], - MERGE_DATA_SIZE, - ["left", "right", "outer", "inner"], - [False, True], + BINARY_OP_DATA_SIZE, + ["left", "inner"], + [False], ] - def setup(self, impl, data_type, data_size, how, sort): + def setup(self, data_size, how, sort): self.df1 = generate_dataframe( - impl, data_type, data_size[0], data_size[1], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) self.df2 = generate_dataframe( - impl, data_type, data_size[2], data_size[3], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH ) - def time_merge(self, impl, data_type, data_size, how, sort): - self.df1.merge(self.df2, on=self.df1.columns[0], how=how, sort=sort) + @trigger_execution + def time_merge(self, data_size, how, sort): + return self.df1.merge(self.df2, on=self.df1.columns[0], how=how, sort=sort) class TimeConcat: - param_names = ["data_type", "data_size", "how", "axis"] + param_names = ["data_size", "how", "axis"] params = [ - ["int"], - CONCAT_DATA_SIZE, + BINARY_OP_DATA_SIZE, ["inner"], [0, 1], ] - def setup(self, data_type, data_size, how, axis): + def setup(self, data_size, how, axis): # shape for generate_dataframe: first - ncols, second - nrows self.df1 = generate_dataframe( - "modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) self.df2 = generate_dataframe( - "modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH ) - def time_concat(self, data_type, data_size, how, axis): - pd.concat([self.df1, self.df2], axis=axis, join=how) + @trigger_execution + def time_concat(self, data_size, how, axis): + if ASV_USE_IMPL == "modin": + return pd.concat([self.df1, self.df2], axis=axis, join=how) + elif ASV_USE_IMPL == "pandas": + return pandas.concat([self.df1, self.df2], axis=axis, join=how) + else: + raise NotImplementedError class TimeBinaryOp: - param_names = ["data_type", "data_size", "binary_op", "axis"] + param_names = ["data_size", "binary_op", "axis"] params = [ - ["int"], - CONCAT_DATA_SIZE, + BINARY_OP_DATA_SIZE, ["mul"], [0, 1], ] - def setup(self, data_type, data_size, binary_op, axis): + def setup(self, data_size, binary_op, axis): # shape for generate_dataframe: first - ncols, second - nrows self.df1 = generate_dataframe( - "modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) self.df2 = generate_dataframe( - "modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[3], data_size[2], RAND_LOW, RAND_HIGH ) self.op = getattr(self.df1, binary_op) - def time_concat(self, data_type, data_size, binary_op, axis): - self.op(self.df2, axis=axis) + @trigger_execution + def time_binary_op(self, data_size, binary_op, axis): + return self.op(self.df2, axis=axis) class BaseTimeSetItem: - param_names = ["data_type", "data_size", "item_length", "loc", "is_equal_indices"] + param_names = ["data_size", "item_length", "loc", "is_equal_indices"] @staticmethod def get_loc(df, loc, axis, item_length): @@ -182,12 +221,9 @@ def get_loc(df, loc, axis, item_length): else (df.axes[axis][range_based_loc], range_based_loc) ) - def trigger_execution(self): - self.df.shape - - def setup(self, data_type, data_size, item_length, loc, is_equal_indices): + def setup(self, data_size, item_length, loc, is_equal_indices): self.df = generate_dataframe( - "modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ).copy() self.loc, self.iloc = self.get_loc( self.df, loc, item_length=item_length, axis=1 @@ -201,65 +237,70 @@ def setup(self, data_type, data_size, item_length, loc, is_equal_indices): class TimeSetItem(BaseTimeSetItem): params = [ - ["int"], - DATA_SIZE, + UNARY_OP_DATA_SIZE, [1], ["zero", "middle", "last"], [True, False], ] + @trigger_execution def time_setitem_qc(self, *args, **kwargs): self.df[self.loc] = self.item - self.trigger_execution() + return self.df + @trigger_execution def time_setitem_raw(self, *args, **kwargs): self.df[self.loc] = self.item_raw - self.trigger_execution() + return self.df class TimeInsert(BaseTimeSetItem): params = [ - ["int"], - DATA_SIZE, + UNARY_OP_DATA_SIZE, [1], ["zero", "middle", "last"], [True, False], ] + @trigger_execution def time_insert_qc(self, *args, **kwargs): self.df.insert(loc=self.iloc, column=random_string(), value=self.item) - self.trigger_execution() + return self.df + @trigger_execution def time_insert_raw(self, *args, **kwargs): self.df.insert(loc=self.iloc, column=random_string(), value=self.item_raw) - self.trigger_execution() + return self.df class TimeArithmetic: - param_names = ["impl", "data_type", "data_size", "axis"] + param_names = ["data_size", "axis"] params = [ - ["modin", "pandas"], - ["int"], - ARITHMETIC_DATA_SIZE, + UNARY_OP_DATA_SIZE, [0, 1], ] - def setup(self, impl, data_type, data_size, axis): + def setup(self, data_size, axis): self.df = generate_dataframe( - impl, data_type, data_size[0], data_size[1], RAND_LOW, RAND_HIGH + ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH ) - def time_sum(self, impl, data_type, data_size, axis): - self.df.sum(axis=axis) + @trigger_execution + def time_sum(self, data_size, axis): + return self.df.sum(axis=axis) + + @trigger_execution + def time_median(self, data_size, axis): + return self.df.median(axis=axis) - def time_median(self, impl, data_type, data_size, axis): - self.df.median(axis=axis) + @trigger_execution + def time_nunique(self, data_size, axis): + return self.df.nunique(axis=axis) - def time_nunique(self, impl, data_type, data_size, axis): - self.df.nunique(axis=axis) + @trigger_execution + def time_apply(self, data_size, axis): + return self.df.apply(lambda df: df.sum(), axis=axis) - def time_apply(self, impl, data_type, data_size, axis): - self.df.apply(lambda df: df.sum(), axis=axis) - - def time_mean(self, impl, data_type, data_size, axis): - self.df.mean(axis=axis) + @trigger_execution + def time_mean(self, data_size, axis): + return self.df.mean(axis=axis) diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 9eabcfc5cd9..76ab79e4190 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -221,6 +221,17 @@ class TrackFileLeaks(EnvironmentVariable, type=bool): default = sys.platform != "win32" +class AsvImplementation(EnvironmentVariable, type=ExactStr): + """ + Allows to select a library that we will use for testing performance. + """ + + varname = "MODIN_ASV_USE_IMPL" + choices = ("modin", "pandas") + + default = "modin" + + def _check_vars(): """ Look out for any environment variables that start with "MODIN_" prefix