Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Time shifts with different granularity for ECharts #24176

Merged
merged 15 commits into from
Jun 8, 2023
Merged
159 changes: 133 additions & 26 deletions superset/common/query_context_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from superset.common.utils.query_cache_manager import QueryCacheManager
from superset.common.utils.time_range_utils import get_since_until_from_query_object
from superset.connectors.base.models import BaseDatasource
from superset.constants import CacheRegion
from superset.constants import CacheRegion, TimeGrain
from superset.exceptions import (
InvalidPostProcessingError,
QueryObjectValidationError,
Expand Down Expand Up @@ -74,6 +74,27 @@
stats_logger: BaseStatsLogger = config["STATS_LOGGER"]
logger = logging.getLogger(__name__)

# Temporary column used for joining aggregated offset results
AGGREGATED_JOIN_COLUMN = "__aggregated_join_column"

# This only includes time grains that may influence
# the temporal column used for joining offset results.
# Given that we don't allow time shifts smaller than a day,
# we don't need to include smaller time grains aggregations.
AGGREGATED_JOIN_GRAINS = {
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
TimeGrain.WEEK,
TimeGrain.WEEK_STARTING_SUNDAY,
TimeGrain.WEEK_STARTING_MONDAY,
TimeGrain.WEEK_ENDING_SATURDAY,
TimeGrain.WEEK_ENDING_SUNDAY,
TimeGrain.MONTH,
TimeGrain.QUARTER,
TimeGrain.YEAR,
}

# Right suffix used for joining offset results
R_SUFFIX = "__right_suffix"


class CachedTimeOffset(TypedDict):
df: pd.DataFrame
Expand All @@ -89,10 +110,6 @@ class QueryContextProcessor:

_query_context: QueryContext
_qc_datasource: BaseDatasource
"""
The query context contains the query object and additional fields necessary
to retrieve the data payload for a given viz.
"""

def __init__(self, query_context: QueryContext):
self._query_context = query_context
Expand Down Expand Up @@ -307,6 +324,35 @@ def _get_timestamp_format(

return df

@staticmethod
def get_time_grain(query_object: QueryObject) -> Any | None:
if (
query_object.columns
and len(query_object.columns) > 0
and isinstance(query_object.columns[0], dict)
):
# If the time grain is in the columns it will be the first one
# and it will be of AdhocColumn type
return query_object.columns[0].get("timeGrain")

return query_object.extras.get("time_grain_sqla")

def add_aggregated_join_column(
self,
df: pd.DataFrame,
time_grain: str,
join_column_producer: Any = None,
) -> None:
if join_column_producer:
df[AGGREGATED_JOIN_COLUMN] = df.apply(
lambda row: join_column_producer(row, 0), axis=1
)
else:
df[AGGREGATED_JOIN_COLUMN] = df.apply(
lambda row: self.get_aggregated_join_column(row, 0, time_grain),
axis=1,
)

def processing_time_offsets( # pylint: disable=too-many-locals,too-many-statements
self,
df: pd.DataFrame,
Expand All @@ -317,9 +363,8 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
query_object_clone = copy.copy(query_object)
queries: list[str] = []
cache_keys: list[str | None] = []
rv_dfs: list[pd.DataFrame] = [df]
offset_dfs: list[pd.DataFrame] = []

time_offsets = query_object.time_offsets
outer_from_dttm, outer_to_dttm = get_since_until_from_query_object(query_object)
if not outer_from_dttm or not outer_to_dttm:
raise QueryObjectValidationError(
Expand All @@ -328,7 +373,31 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
"when using a Time Comparison."
)
)
for offset in time_offsets:

columns = df.columns
time_grain = self.get_time_grain(query_object)

if not time_grain:
raise QueryObjectValidationError(
_("Time Grain must be specified when using Time Shift.")
)

join_column_producer = config["TIME_GRAIN_JOIN_COLUMN_PRODUCERS"].get(
time_grain
)
use_aggregated_join_column = (
join_column_producer or time_grain in AGGREGATED_JOIN_GRAINS
)
if use_aggregated_join_column:
self.add_aggregated_join_column(df, time_grain, join_column_producer)
# skips the first column which is the temporal column
# because we'll use the aggregated join columns instead
columns = df.columns[1:]

metric_names = get_metric_names(query_object.metrics)
join_keys = [col for col in columns if col not in metric_names]

for offset in query_object.time_offsets:
try:
# pylint: disable=line-too-long
# Since the xaxis is also a column name for the time filter, xaxis_label will be set as granularity
Expand Down Expand Up @@ -364,13 +433,15 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
]

# `offset` is added to the hash function
cache_key = self.query_cache_key(query_object_clone, time_offset=offset)
cache_key = self.query_cache_key(
query_object_clone, time_offset=offset, time_grain=time_grain
)
cache = QueryCacheManager.get(
cache_key, CacheRegion.DATA, query_context.force
)
# whether hit on the cache
if cache.is_loaded:
rv_dfs.append(cache.df)
offset_dfs.append(cache.df)
queries.append(cache.query)
cache_keys.append(cache_key)
continue
Expand All @@ -379,11 +450,8 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
# rename metrics: SUM(value) => SUM(value) 1 year ago
metrics_mapping = {
metric: TIME_COMPARISON.join([metric, offset])
for metric in get_metric_names(
query_object_clone_dct.get("metrics", [])
)
for metric in metric_names
}
join_keys = [col for col in df.columns if col not in metrics_mapping.keys()]

if isinstance(self._qc_datasource, Query):
result = self._qc_datasource.exc_query(query_object_clone_dct)
Expand Down Expand Up @@ -420,21 +488,19 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
)
)

# modifies temporal column using offset
offset_metrics_df[index] = offset_metrics_df[index] - DateOffset(
**normalize_time_delta(offset)
)
Comment on lines 492 to 494
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the index of offset_metrics_df seems constructing an appropriate index for the result of dataframe, if the align of the results incorrect, I think we should find root cause from here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I understand what you're suggesting. Can you explain more?


# df left join `offset_metrics_df`
offset_df = dataframe_utils.left_join_df(
left_df=df,
right_df=offset_metrics_df,
join_keys=join_keys,
)
offset_slice = offset_df[metrics_mapping.values()]
if use_aggregated_join_column:
self.add_aggregated_join_column(
offset_metrics_df, time_grain, join_column_producer
)

# set offset_slice to cache and stack.
# cache df and query
value = {
"df": offset_slice,
"df": offset_metrics_df,
"query": result.query,
}
cache.set(
Expand All @@ -444,10 +510,51 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
datasource_uid=query_context.datasource.uid,
region=CacheRegion.DATA,
)
rv_dfs.append(offset_slice)
offset_dfs.append(offset_metrics_df)

if offset_dfs:
# iterate on offset_dfs, left join each with df
for offset_df in offset_dfs:
df = dataframe_utils.left_join_df(
left_df=df,
right_df=offset_df,
join_keys=join_keys,
rsuffix=R_SUFFIX,
)

rv_df = pd.concat(rv_dfs, axis=1, copy=False) if time_offsets else df
return CachedTimeOffset(df=rv_df, queries=queries, cache_keys=cache_keys)
# removes columns used for join
df.drop(
list(df.filter(regex=f"{AGGREGATED_JOIN_COLUMN}|{R_SUFFIX}")),
axis=1,
inplace=True,
)

return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)

@staticmethod
def get_aggregated_join_column(
row: pd.Series, column_index: int, time_grain: str
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
) -> str:
if time_grain in (
TimeGrain.WEEK_STARTING_SUNDAY,
TimeGrain.WEEK_ENDING_SATURDAY,
):
return row[column_index].strftime("%Y-W%U")

if time_grain in (
TimeGrain.WEEK,
TimeGrain.WEEK_STARTING_MONDAY,
TimeGrain.WEEK_ENDING_SUNDAY,
):
return row[column_index].strftime("%Y-W%W")

if time_grain == TimeGrain.MONTH:
return row[column_index].strftime("%Y-%m")

if time_grain == TimeGrain.QUARTER:
return row[column_index].strftime("%Y-Q") + str(row[column_index].quarter)

return row[column_index].strftime("%Y")
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved

def get_data(self, df: pd.DataFrame) -> str | list[dict[str, Any]]:
if self._query_context.result_format in ChartDataResultFormat.table_like():
Expand Down
6 changes: 5 additions & 1 deletion superset/common/utils/dataframe_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ def left_join_df(
left_df: pd.DataFrame,
right_df: pd.DataFrame,
join_keys: list[str],
lsuffix: str = "",
rsuffix: str = "",
) -> pd.DataFrame:
df = left_df.set_index(join_keys).join(right_df.set_index(join_keys))
df = left_df.set_index(join_keys).join(
right_df.set_index(join_keys), lsuffix=lsuffix, rsuffix=rsuffix
)
df.reset_index(inplace=True)
return df

Expand Down
12 changes: 12 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from dateutil import tz
from flask import Blueprint
from flask_appbuilder.security.manager import AUTH_DB
from pandas import Series
from pandas._libs.parsers import STR_NA_VALUES # pylint: disable=no-name-in-module
from sqlalchemy.orm.query import Query

Expand Down Expand Up @@ -773,6 +774,17 @@ class D3Format(TypedDict, total=False):
# }
TIME_GRAIN_ADDON_EXPRESSIONS: dict[str, dict[str, str]] = {}

# Map of custom time grains and artificial join column producers used
# when generating the join key between results and time shifts.
# See supeset/common/query_context_processor.get_aggregated_join_column
#
# Example of a join column producer that aggregates by fiscal year
# def join_producer(row: Series, column_index: int) -> str:
# return row[index].strftime("%F")
#
# TIME_GRAIN_JOIN_COLUMN_PRODUCERS = {"P1F": join_producer}
TIME_GRAIN_JOIN_COLUMN_PRODUCERS: dict[str, Callable[[Series, int], str]] = {}

# ---------------------------------------------------
# List of viz_types not allowed in your environment
# For example: Disable pivot table and treemap:
Expand Down
24 changes: 24 additions & 0 deletions superset/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,30 @@ class RouteMethod: # pylint: disable=too-few-public-methods
)


class TimeGrain(str, Enum):
SECOND = "PT1S"
FIVE_SECONDS = "PT5S"
THIRTY_SECONDS = "PT30S"
MINUTE = "PT1M"
FIVE_MINUTES = "PT5M"
TEN_MINUTES = "PT10M"
FIFTEEN_MINUTES = "PT15M"
THIRTY_MINUTES = "PT30M"
HALF_HOUR = "PT0.5H"
HOUR = "PT1H"
SIX_HOURS = "PT6H"
DAY = "P1D"
WEEK = "P1W"
WEEK_STARTING_SUNDAY = "1969-12-28T00:00:00Z/P1W"
WEEK_STARTING_MONDAY = "1969-12-29T00:00:00Z/P1W"
WEEK_ENDING_SATURDAY = "P1W/1970-01-03T00:00:00Z"
WEEK_ENDING_SUNDAY = "P1W/1970-01-04T00:00:00Z"
MONTH = "P1M"
QUARTER = "P3M"
QUARTER_YEAR = "P0.25Y"
YEAR = "P1Y"


class PandasAxis(int, Enum):
ROW = 0
COLUMN = 1
Expand Down
17 changes: 9 additions & 8 deletions superset/db_engine_specs/ascend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from sqlalchemy.dialects import registry

from superset.constants import TimeGrain
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
from superset.db_engine_specs.impala import ImpalaEngineSpec


Expand All @@ -29,12 +30,12 @@ class AscendEngineSpec(ImpalaEngineSpec):

_time_grain_expressions = {
None: "{col}",
"PT1S": "DATE_TRUNC('second', {col})",
"PT1M": "DATE_TRUNC('minute', {col})",
"PT1H": "DATE_TRUNC('hour', {col})",
"P1D": "DATE_TRUNC('day', {col})",
"P1W": "DATE_TRUNC('week', {col})",
"P1M": "DATE_TRUNC('month', {col})",
"P3M": "DATE_TRUNC('quarter', {col})",
"P1Y": "DATE_TRUNC('year', {col})",
TimeGrain.SECOND: "DATE_TRUNC('second', {col})",
TimeGrain.MINUTE: "DATE_TRUNC('minute', {col})",
TimeGrain.HOUR: "DATE_TRUNC('hour', {col})",
TimeGrain.DAY: "DATE_TRUNC('day', {col})",
TimeGrain.WEEK: "DATE_TRUNC('week', {col})",
TimeGrain.MONTH: "DATE_TRUNC('month', {col})",
TimeGrain.QUARTER: "DATE_TRUNC('quarter', {col})",
TimeGrain.YEAR: "DATE_TRUNC('year', {col})",
}
21 changes: 11 additions & 10 deletions superset/db_engine_specs/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from flask_babel import gettext as __
from sqlalchemy import types

from superset.constants import TimeGrain
from superset.db_engine_specs.base import BaseEngineSpec
from superset.errors import SupersetErrorType

Expand All @@ -38,17 +39,17 @@ class AthenaEngineSpec(BaseEngineSpec):

_time_grain_expressions = {
None: "{col}",
"PT1S": "date_trunc('second', CAST({col} AS TIMESTAMP))",
"PT1M": "date_trunc('minute', CAST({col} AS TIMESTAMP))",
"PT1H": "date_trunc('hour', CAST({col} AS TIMESTAMP))",
"P1D": "date_trunc('day', CAST({col} AS TIMESTAMP))",
"P1W": "date_trunc('week', CAST({col} AS TIMESTAMP))",
"P1M": "date_trunc('month', CAST({col} AS TIMESTAMP))",
"P3M": "date_trunc('quarter', CAST({col} AS TIMESTAMP))",
"P1Y": "date_trunc('year', CAST({col} AS TIMESTAMP))",
"P1W/1970-01-03T00:00:00Z": "date_add('day', 5, date_trunc('week', \
TimeGrain.SECOND: "date_trunc('second', CAST({col} AS TIMESTAMP))",
TimeGrain.MINUTE: "date_trunc('minute', CAST({col} AS TIMESTAMP))",
TimeGrain.HOUR: "date_trunc('hour', CAST({col} AS TIMESTAMP))",
TimeGrain.DAY: "date_trunc('day', CAST({col} AS TIMESTAMP))",
TimeGrain.WEEK: "date_trunc('week', CAST({col} AS TIMESTAMP))",
TimeGrain.MONTH: "date_trunc('month', CAST({col} AS TIMESTAMP))",
TimeGrain.QUARTER: "date_trunc('quarter', CAST({col} AS TIMESTAMP))",
TimeGrain.YEAR: "date_trunc('year', CAST({col} AS TIMESTAMP))",
TimeGrain.WEEK_ENDING_SATURDAY: "date_add('day', 5, date_trunc('week', \
date_add('day', 1, CAST({col} AS TIMESTAMP))))",
"1969-12-28T00:00:00Z/P1W": "date_add('day', -1, date_trunc('week', \
TimeGrain.WEEK_STARTING_SUNDAY: "date_add('day', -1, date_trunc('week', \
date_add('day', 1, CAST({col} AS TIMESTAMP))))",
}

Expand Down
Loading