diff --git a/superset-frontend/packages/superset-ui-chart-controls/src/operators/contributionOperator.ts b/superset-frontend/packages/superset-ui-chart-controls/src/operators/contributionOperator.ts index a72a17be6b628..a709dfb846a5a 100644 --- a/superset-frontend/packages/superset-ui-chart-controls/src/operators/contributionOperator.ts +++ b/superset-frontend/packages/superset-ui-chart-controls/src/operators/contributionOperator.ts @@ -22,12 +22,13 @@ import { PostProcessingFactory } from './types'; /* eslint-disable @typescript-eslint/no-unused-vars */ export const contributionOperator: PostProcessingFactory< PostProcessingContribution -> = (formData, queryObject) => { +> = (formData, queryObject, time_shifts) => { if (formData.contributionMode) { return { operation: 'contribution', options: { orientation: formData.contributionMode, + time_shifts, }, }; } diff --git a/superset-frontend/plugins/plugin-chart-echarts/src/Timeseries/buildQuery.ts b/superset-frontend/plugins/plugin-chart-echarts/src/Timeseries/buildQuery.ts index 69a8020657b8a..781d5678e225e 100644 --- a/superset-frontend/plugins/plugin-chart-echarts/src/Timeseries/buildQuery.ts +++ b/superset-frontend/plugins/plugin-chart-echarts/src/Timeseries/buildQuery.ts @@ -78,6 +78,10 @@ export default function buildQuery(formData: QueryFormData) { ...ensureIsArray(groupby), ]; + const time_offsets = isTimeComparison(formData, baseQueryObject) + ? formData.time_compare + : []; + return [ { ...baseQueryObject, @@ -87,9 +91,7 @@ export default function buildQuery(formData: QueryFormData) { ...(isXAxisSet(formData) ? {} : { is_timeseries: true }), // todo: move `normalizeOrderBy to extractQueryFields` orderby: normalizeOrderBy(baseQueryObject).orderby, - time_offsets: isTimeComparison(formData, baseQueryObject) - ? formData.time_compare - : [], + time_offsets, /* Note that: 1. The resample, rolling, cum, timeCompare operators should be after pivot. 2. the flatOperator makes multiIndex Dataframe into flat Dataframe @@ -100,7 +102,7 @@ export default function buildQuery(formData: QueryFormData) { timeCompareOperator(formData, baseQueryObject), resampleOperator(formData, baseQueryObject), renameOperator(formData, baseQueryObject), - contributionOperator(formData, baseQueryObject), + contributionOperator(formData, baseQueryObject, time_offsets), sortOperator(formData, baseQueryObject), flattenOperator(formData, baseQueryObject), // todo: move prophet before flatten diff --git a/superset/utils/pandas_postprocessing/contribution.py b/superset/utils/pandas_postprocessing/contribution.py index d383312f751a4..89a1413b74eab 100644 --- a/superset/utils/pandas_postprocessing/contribution.py +++ b/superset/utils/pandas_postprocessing/contribution.py @@ -15,10 +15,10 @@ # specific language governing permissions and limitations # under the License. from decimal import Decimal -from typing import Optional +from typing import Any from flask_babel import gettext as _ -from pandas import DataFrame +from pandas import DataFrame, MultiIndex from superset.exceptions import InvalidPostProcessingError from superset.utils.core import PostProcessingContributionOrientation @@ -28,11 +28,12 @@ @validate_column_args("columns") def contribution( df: DataFrame, - orientation: Optional[ - PostProcessingContributionOrientation - ] = PostProcessingContributionOrientation.COLUMN, - columns: Optional[list[str]] = None, - rename_columns: Optional[list[str]] = None, + orientation: ( + PostProcessingContributionOrientation | None + ) = PostProcessingContributionOrientation.COLUMN, + columns: list[str] | None = None, + time_shifts: list[str] | None = None, + rename_columns: list[str] | None = None, ) -> DataFrame: """ Calculate cell contribution to row/column total for numeric columns. @@ -40,8 +41,11 @@ def contribution( If `columns` are specified, only calculate contributions on selected columns. + Contribution for time shift columns will be calculated separately. + :param df: DataFrame containing all-numeric data (temporal column ignored) :param columns: Columns to calculate values from. + :param time_shifts: The applied time shifts. :param rename_columns: The new labels for the calculated contribution columns. The original columns will not be removed. :param orientation: calculate by dividing cell with row/column total @@ -62,15 +66,86 @@ def contribution( column=col, ) ) - columns = columns or numeric_df.columns - rename_columns = rename_columns or columns - if len(rename_columns) != len(columns): + actual_columns = columns or numeric_df.columns + + rename_columns = rename_columns or actual_columns + if len(rename_columns) != len(actual_columns): raise InvalidPostProcessingError( - _("`rename_columns` must have the same length as `columns`.") + _( + "`rename_columns` must have the same length as `columns` + `time_shift_columns`." + ) ) # limit to selected columns - numeric_df = numeric_df[columns] - axis = 0 if orientation == PostProcessingContributionOrientation.COLUMN else 1 - numeric_df = numeric_df / numeric_df.values.sum(axis=axis, keepdims=True) - contribution_df[rename_columns] = numeric_df + numeric_df_view = numeric_df[actual_columns] + + if orientation == PostProcessingContributionOrientation.COLUMN: + numeric_df_view = numeric_df_view / numeric_df_view.values.sum( + axis=0, keepdims=True + ) + contribution_df[rename_columns] = numeric_df_view + return contribution_df + + result = get_column_groups(numeric_df_view, time_shifts, rename_columns) + calculate_row_contribution( + contribution_df, result["non_time_shift"][0], result["non_time_shift"][1] + ) + for time_shift in result["time_shifts"].items(): + calculate_row_contribution(contribution_df, time_shift[1][0], time_shift[1][1]) return contribution_df + + +def get_column_groups( + df: DataFrame, time_shifts: list[str] | None, rename_columns: list[str] +) -> dict[str, Any]: + """ + Group columns based on whether they have a time shift. + + :param df: DataFrame to group columns from + :param time_shifts: List of time shifts to group by + :param rename_columns: List of new column names + :return: Dictionary with two keys: 'non_time_shift' and 'time_shifts'. 'non_time_shift' + maps to a tuple of original and renamed columns without a time shift. 'time_shifts' maps + to a dictionary where each key is a time shift and each value is a tuple of original and + renamed columns with that time shift. + """ + result: dict[str, Any] = { + "non_time_shift": ([], []), # take the form of ([A, B, C], [X, Y, Z]) + "time_shifts": {}, # take the form of {A: ([X], [Y]), B: ([Z], [W])} + } + for i, col in enumerate(df.columns): + col_0 = col[0] if isinstance(df.columns, MultiIndex) else col + time_shift = None + if time_shifts and isinstance(col_0, str): + for ts in time_shifts: + if col_0.endswith(ts): + time_shift = ts + break + if time_shift is not None: + if time_shift not in result["time_shifts"]: + result["time_shifts"][time_shift] = ([], []) + result["time_shifts"][time_shift][0].append(col) + result["time_shifts"][time_shift][1].append(rename_columns[i]) + else: + result["non_time_shift"][0].append(col) + result["non_time_shift"][1].append(rename_columns[i]) + return result + + +def calculate_row_contribution( + df: DataFrame, columns: list[str], rename_columns: list[str] +) -> None: + """ + Calculate the contribution of each column to the row total and update the DataFrame. + + This function calculates the contribution of each selected column to the total of the row, + and updates the DataFrame with these contribution percentages in place of the original values. + + :param df: The DataFrame to calculate contributions for. + :param columns: A list of column names to calculate contributions for. + :param rename_columns: A list of new column names for the contribution columns. + """ + # calculate the row sum considering only the selected columns + row_sum_except_selected = df.loc[:, columns].sum(axis=1) + + # update the dataframe cells with the row contribution percentage + df[rename_columns] = df.loc[:, columns].div(row_sum_except_selected, axis=0) diff --git a/tests/unit_tests/pandas_postprocessing/test_contribution.py b/tests/unit_tests/pandas_postprocessing/test_contribution.py index 7eb34c4d13f7b..7853fc97f29e7 100644 --- a/tests/unit_tests/pandas_postprocessing/test_contribution.py +++ b/tests/unit_tests/pandas_postprocessing/test_contribution.py @@ -26,37 +26,43 @@ from superset.utils.core import DTTM_ALIAS, PostProcessingContributionOrientation from superset.utils.pandas_postprocessing import contribution +df_template = DataFrame( + { + DTTM_ALIAS: [ + datetime(2020, 7, 16, 14, 49), + datetime(2020, 7, 16, 14, 50), + datetime(2020, 7, 16, 14, 51), + ], + "a": [1, 3, nan], + "b": [1, 9, nan], + "c": [nan, nan, nan], + } +) -def test_contribution(): - df = DataFrame( - { - DTTM_ALIAS: [ - datetime(2020, 7, 16, 14, 49), - datetime(2020, 7, 16, 14, 50), - datetime(2020, 7, 16, 14, 51), - ], - "a": [1, 3, nan], - "b": [1, 9, nan], - "c": [nan, nan, nan], - } - ) + +def test_non_numeric_columns(): with pytest.raises(InvalidPostProcessingError, match="not numeric"): - contribution(df, columns=[DTTM_ALIAS]) + contribution(df_template.copy(), columns=[DTTM_ALIAS]) + +def test_rename_should_have_same_length(): with pytest.raises(InvalidPostProcessingError, match="same length"): - contribution(df, columns=["a"], rename_columns=["aa", "bb"]) + contribution(df_template.copy(), columns=["a"], rename_columns=["aa", "bb"]) - # cell contribution across row + +def test_cell_contribution_across_row(): processed_df = contribution( - df, + df_template.copy(), orientation=PostProcessingContributionOrientation.ROW, ) assert processed_df.columns.tolist() == [DTTM_ALIAS, "a", "b", "c"] assert_array_equal(processed_df["a"].tolist(), [0.5, 0.25, nan]) assert_array_equal(processed_df["b"].tolist(), [0.5, 0.75, nan]) - assert_array_equal(processed_df["c"].tolist(), [0, 0, nan]) + assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan]) + - # cell contribution across column without temporal column +def test_cell_contribution_across_column_without_temporal_column(): + df = df_template.copy() df.pop(DTTM_ALIAS) processed_df = contribution( df, orientation=PostProcessingContributionOrientation.COLUMN @@ -66,7 +72,10 @@ def test_contribution(): assert_array_equal(processed_df["b"].tolist(), [0.1, 0.9, 0]) assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan]) - # contribution only on selected columns + +def test_contribution_on_selected_columns(): + df = df_template.copy() + df.pop(DTTM_ALIAS) processed_df = contribution( df, orientation=PostProcessingContributionOrientation.COLUMN, @@ -78,3 +87,40 @@ def test_contribution(): assert_array_equal(processed_df["b"].tolist(), [1, 9, nan]) assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan]) assert processed_df["pct_a"].tolist() == [0.25, 0.75, 0] + + +def test_contribution_with_time_shift_columns(): + df = DataFrame( + { + DTTM_ALIAS: [ + datetime(2020, 7, 16, 14, 49), + datetime(2020, 7, 16, 14, 50), + ], + "a": [3, 6], + "b": [3, 3], + "c": [6, 3], + "a__1 week ago": [2, 2], + "b__1 week ago": [1, 1], + "c__1 week ago": [1, 1], + } + ) + processed_df = contribution( + df, + orientation=PostProcessingContributionOrientation.ROW, + time_shifts=["1 week ago"], + ) + assert processed_df.columns.tolist() == [ + DTTM_ALIAS, + "a", + "b", + "c", + "a__1 week ago", + "b__1 week ago", + "c__1 week ago", + ] + assert_array_equal(processed_df["a"].tolist(), [0.25, 0.5]) + assert_array_equal(processed_df["b"].tolist(), [0.25, 0.25]) + assert_array_equal(processed_df["c"].tolist(), [0.50, 0.25]) + assert_array_equal(processed_df["a__1 week ago"].tolist(), [0.5, 0.5]) + assert_array_equal(processed_df["b__1 week ago"].tolist(), [0.25, 0.25]) + assert_array_equal(processed_df["c__1 week ago"].tolist(), [0.25, 0.25])