Skip to content

Commit

Permalink
fix: Contribution percentages for ECharts plugins (apache#28368)
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-s-molina authored and jzhao62 committed May 16, 2024
1 parent 4a86e95 commit 705e344
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import { PostProcessingFactory } from './types';
/* eslint-disable @typescript-eslint/no-unused-vars */
export const contributionOperator: PostProcessingFactory<
PostProcessingContribution
> = (formData, queryObject) => {
> = (formData, queryObject, time_shifts) => {
if (formData.contributionMode) {
return {
operation: 'contribution',
options: {
orientation: formData.contributionMode,
time_shifts,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ export default function buildQuery(formData: QueryFormData) {
...ensureIsArray(groupby),
];

const time_offsets = isTimeComparison(formData, baseQueryObject)
? formData.time_compare
: [];

return [
{
...baseQueryObject,
Expand All @@ -87,9 +91,7 @@ export default function buildQuery(formData: QueryFormData) {
...(isXAxisSet(formData) ? {} : { is_timeseries: true }),
// todo: move `normalizeOrderBy to extractQueryFields`
orderby: normalizeOrderBy(baseQueryObject).orderby,
time_offsets: isTimeComparison(formData, baseQueryObject)
? formData.time_compare
: [],
time_offsets,
/* Note that:
1. The resample, rolling, cum, timeCompare operators should be after pivot.
2. the flatOperator makes multiIndex Dataframe into flat Dataframe
Expand All @@ -100,7 +102,7 @@ export default function buildQuery(formData: QueryFormData) {
timeCompareOperator(formData, baseQueryObject),
resampleOperator(formData, baseQueryObject),
renameOperator(formData, baseQueryObject),
contributionOperator(formData, baseQueryObject),
contributionOperator(formData, baseQueryObject, time_offsets),
sortOperator(formData, baseQueryObject),
flattenOperator(formData, baseQueryObject),
// todo: move prophet before flatten
Expand Down
105 changes: 90 additions & 15 deletions superset/utils/pandas_postprocessing/contribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# specific language governing permissions and limitations
# under the License.
from decimal import Decimal
from typing import Optional
from typing import Any

from flask_babel import gettext as _
from pandas import DataFrame
from pandas import DataFrame, MultiIndex

from superset.exceptions import InvalidPostProcessingError
from superset.utils.core import PostProcessingContributionOrientation
Expand All @@ -28,20 +28,24 @@
@validate_column_args("columns")
def contribution(
df: DataFrame,
orientation: Optional[
PostProcessingContributionOrientation
] = PostProcessingContributionOrientation.COLUMN,
columns: Optional[list[str]] = None,
rename_columns: Optional[list[str]] = None,
orientation: (
PostProcessingContributionOrientation | None
) = PostProcessingContributionOrientation.COLUMN,
columns: list[str] | None = None,
time_shifts: list[str] | None = None,
rename_columns: list[str] | None = None,
) -> DataFrame:
"""
Calculate cell contribution to row/column total for numeric columns.
Non-numeric columns will be kept untouched.
If `columns` are specified, only calculate contributions on selected columns.
Contribution for time shift columns will be calculated separately.
:param df: DataFrame containing all-numeric data (temporal column ignored)
:param columns: Columns to calculate values from.
:param time_shifts: The applied time shifts.
:param rename_columns: The new labels for the calculated contribution columns.
The original columns will not be removed.
:param orientation: calculate by dividing cell with row/column total
Expand All @@ -62,15 +66,86 @@ def contribution(
column=col,
)
)
columns = columns or numeric_df.columns
rename_columns = rename_columns or columns
if len(rename_columns) != len(columns):
actual_columns = columns or numeric_df.columns

rename_columns = rename_columns or actual_columns
if len(rename_columns) != len(actual_columns):
raise InvalidPostProcessingError(
_("`rename_columns` must have the same length as `columns`.")
_(
"`rename_columns` must have the same length as `columns` + `time_shift_columns`."
)
)
# limit to selected columns
numeric_df = numeric_df[columns]
axis = 0 if orientation == PostProcessingContributionOrientation.COLUMN else 1
numeric_df = numeric_df / numeric_df.values.sum(axis=axis, keepdims=True)
contribution_df[rename_columns] = numeric_df
numeric_df_view = numeric_df[actual_columns]

if orientation == PostProcessingContributionOrientation.COLUMN:
numeric_df_view = numeric_df_view / numeric_df_view.values.sum(
axis=0, keepdims=True
)
contribution_df[rename_columns] = numeric_df_view
return contribution_df

result = get_column_groups(numeric_df_view, time_shifts, rename_columns)
calculate_row_contribution(
contribution_df, result["non_time_shift"][0], result["non_time_shift"][1]
)
for time_shift in result["time_shifts"].items():
calculate_row_contribution(contribution_df, time_shift[1][0], time_shift[1][1])
return contribution_df


def get_column_groups(
df: DataFrame, time_shifts: list[str] | None, rename_columns: list[str]
) -> dict[str, Any]:
"""
Group columns based on whether they have a time shift.
:param df: DataFrame to group columns from
:param time_shifts: List of time shifts to group by
:param rename_columns: List of new column names
:return: Dictionary with two keys: 'non_time_shift' and 'time_shifts'. 'non_time_shift'
maps to a tuple of original and renamed columns without a time shift. 'time_shifts' maps
to a dictionary where each key is a time shift and each value is a tuple of original and
renamed columns with that time shift.
"""
result: dict[str, Any] = {
"non_time_shift": ([], []), # take the form of ([A, B, C], [X, Y, Z])
"time_shifts": {}, # take the form of {A: ([X], [Y]), B: ([Z], [W])}
}
for i, col in enumerate(df.columns):
col_0 = col[0] if isinstance(df.columns, MultiIndex) else col
time_shift = None
if time_shifts and isinstance(col_0, str):
for ts in time_shifts:
if col_0.endswith(ts):
time_shift = ts
break
if time_shift is not None:
if time_shift not in result["time_shifts"]:
result["time_shifts"][time_shift] = ([], [])
result["time_shifts"][time_shift][0].append(col)
result["time_shifts"][time_shift][1].append(rename_columns[i])
else:
result["non_time_shift"][0].append(col)
result["non_time_shift"][1].append(rename_columns[i])
return result


def calculate_row_contribution(
df: DataFrame, columns: list[str], rename_columns: list[str]
) -> None:
"""
Calculate the contribution of each column to the row total and update the DataFrame.
This function calculates the contribution of each selected column to the total of the row,
and updates the DataFrame with these contribution percentages in place of the original values.
:param df: The DataFrame to calculate contributions for.
:param columns: A list of column names to calculate contributions for.
:param rename_columns: A list of new column names for the contribution columns.
"""
# calculate the row sum considering only the selected columns
row_sum_except_selected = df.loc[:, columns].sum(axis=1)

# update the dataframe cells with the row contribution percentage
df[rename_columns] = df.loc[:, columns].div(row_sum_except_selected, axis=0)
86 changes: 66 additions & 20 deletions tests/unit_tests/pandas_postprocessing/test_contribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,43 @@
from superset.utils.core import DTTM_ALIAS, PostProcessingContributionOrientation
from superset.utils.pandas_postprocessing import contribution

df_template = DataFrame(
{
DTTM_ALIAS: [
datetime(2020, 7, 16, 14, 49),
datetime(2020, 7, 16, 14, 50),
datetime(2020, 7, 16, 14, 51),
],
"a": [1, 3, nan],
"b": [1, 9, nan],
"c": [nan, nan, nan],
}
)

def test_contribution():
df = DataFrame(
{
DTTM_ALIAS: [
datetime(2020, 7, 16, 14, 49),
datetime(2020, 7, 16, 14, 50),
datetime(2020, 7, 16, 14, 51),
],
"a": [1, 3, nan],
"b": [1, 9, nan],
"c": [nan, nan, nan],
}
)

def test_non_numeric_columns():
with pytest.raises(InvalidPostProcessingError, match="not numeric"):
contribution(df, columns=[DTTM_ALIAS])
contribution(df_template.copy(), columns=[DTTM_ALIAS])


def test_rename_should_have_same_length():
with pytest.raises(InvalidPostProcessingError, match="same length"):
contribution(df, columns=["a"], rename_columns=["aa", "bb"])
contribution(df_template.copy(), columns=["a"], rename_columns=["aa", "bb"])

# cell contribution across row

def test_cell_contribution_across_row():
processed_df = contribution(
df,
df_template.copy(),
orientation=PostProcessingContributionOrientation.ROW,
)
assert processed_df.columns.tolist() == [DTTM_ALIAS, "a", "b", "c"]
assert_array_equal(processed_df["a"].tolist(), [0.5, 0.25, nan])
assert_array_equal(processed_df["b"].tolist(), [0.5, 0.75, nan])
assert_array_equal(processed_df["c"].tolist(), [0, 0, nan])
assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan])


# cell contribution across column without temporal column
def test_cell_contribution_across_column_without_temporal_column():
df = df_template.copy()
df.pop(DTTM_ALIAS)
processed_df = contribution(
df, orientation=PostProcessingContributionOrientation.COLUMN
Expand All @@ -66,7 +72,10 @@ def test_contribution():
assert_array_equal(processed_df["b"].tolist(), [0.1, 0.9, 0])
assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan])

# contribution only on selected columns

def test_contribution_on_selected_columns():
df = df_template.copy()
df.pop(DTTM_ALIAS)
processed_df = contribution(
df,
orientation=PostProcessingContributionOrientation.COLUMN,
Expand All @@ -78,3 +87,40 @@ def test_contribution():
assert_array_equal(processed_df["b"].tolist(), [1, 9, nan])
assert_array_equal(processed_df["c"].tolist(), [nan, nan, nan])
assert processed_df["pct_a"].tolist() == [0.25, 0.75, 0]


def test_contribution_with_time_shift_columns():
df = DataFrame(
{
DTTM_ALIAS: [
datetime(2020, 7, 16, 14, 49),
datetime(2020, 7, 16, 14, 50),
],
"a": [3, 6],
"b": [3, 3],
"c": [6, 3],
"a__1 week ago": [2, 2],
"b__1 week ago": [1, 1],
"c__1 week ago": [1, 1],
}
)
processed_df = contribution(
df,
orientation=PostProcessingContributionOrientation.ROW,
time_shifts=["1 week ago"],
)
assert processed_df.columns.tolist() == [
DTTM_ALIAS,
"a",
"b",
"c",
"a__1 week ago",
"b__1 week ago",
"c__1 week ago",
]
assert_array_equal(processed_df["a"].tolist(), [0.25, 0.5])
assert_array_equal(processed_df["b"].tolist(), [0.25, 0.25])
assert_array_equal(processed_df["c"].tolist(), [0.50, 0.25])
assert_array_equal(processed_df["a__1 week ago"].tolist(), [0.5, 0.5])
assert_array_equal(processed_df["b__1 week ago"].tolist(), [0.25, 0.25])
assert_array_equal(processed_df["c__1 week ago"].tolist(), [0.25, 0.25])

0 comments on commit 705e344

Please sign in to comment.