diff --git a/superset/config.py b/superset/config.py index 0976e46c4efae..cb9b7f65162ed 100644 --- a/superset/config.py +++ b/superset/config.py @@ -922,6 +922,14 @@ class CeleryConfig: # pylint: disable=too-few-public-methods # Used for Alerts/Reports (Feature flask ALERT_REPORTS) to set the size for the # sliding cron window size, should be synced with the celery beat config minus 1 second ALERT_REPORTS_CRON_WINDOW_SIZE = 59 +ALERT_REPORTS_WORKING_TIME_OUT_KILL = True +# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout +# Equal to working timeout + ALERT_REPORTS_WORKING_TIME_OUT_LAG +ALERT_REPORTS_WORKING_TIME_OUT_LAG = 10 +# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout +# Equal to working timeout + ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG +ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG = 1 + # A custom prefix to use on all Alerts & Reports emails EMAIL_REPORTS_SUBJECT_PREFIX = "[Report] " diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index 076ebfb40973d..469e85c189fc5 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -17,6 +17,7 @@ import json import logging from operator import eq, ge, gt, le, lt, ne +from timeit import default_timer from typing import Optional import numpy as np @@ -145,7 +146,15 @@ def _execute_query(self) -> pd.DataFrame: limited_rendered_sql = self._report_schedule.database.apply_limit_to_sql( rendered_sql, ALERT_SQL_LIMIT ) - return self._report_schedule.database.get_df(limited_rendered_sql) + start = default_timer() + df = self._report_schedule.database.get_df(limited_rendered_sql) + stop = default_timer() + logger.info( + "Query for %s took %.2f ms", + self._report_schedule.name, + (stop - start) * 1000.0, + ) + return df except SoftTimeLimitExceeded: raise AlertQueryTimeout() except Exception as ex: diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index c3a3d6765f0d9..e3352d241f95b 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -280,12 +280,17 @@ def is_on_working_timeout(self) -> bool: """ Checks if an alert is on a working timeout """ + last_working = ReportScheduleDAO.find_last_entered_working_log( + self._report_schedule, session=self._session + ) + if not last_working: + return False return ( self._report_schedule.working_timeout is not None and self._report_schedule.last_eval_dttm is not None and datetime.utcnow() - timedelta(seconds=self._report_schedule.working_timeout) - > self._report_schedule.last_eval_dttm + > last_working.end_dttm ) def next(self) -> None: diff --git a/superset/reports/dao.py b/superset/reports/dao.py index 5ffa0f0f10aa2..0f729c3d022d3 100644 --- a/superset/reports/dao.py +++ b/superset/reports/dao.py @@ -226,6 +226,25 @@ def find_last_success_log( .first() ) + @staticmethod + def find_last_entered_working_log( + report_schedule: ReportSchedule, session: Optional[Session] = None, + ) -> Optional[ReportExecutionLog]: + """ + Finds last success execution log for a given report + """ + session = session or db.session + return ( + session.query(ReportExecutionLog) + .filter( + ReportExecutionLog.state == ReportState.WORKING, + ReportExecutionLog.report_schedule == report_schedule, + ReportExecutionLog.error_message.is_(None), + ) + .order_by(ReportExecutionLog.end_dttm.desc()) + .first() + ) + @staticmethod def find_last_error_notification( report_schedule: ReportSchedule, session: Optional[Session] = None, diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 73005e77548e3..84026aefb40c6 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -57,7 +57,20 @@ def scheduler() -> None: logger.info( "Scheduling alert %s eta: %s", active_schedule.name, schedule ) - execute.apply_async((active_schedule.id, schedule,), eta=schedule) + async_options = {"eta": schedule} + if ( + active_schedule.working_timeout is not None + and app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] + ): + async_options["time_limit"] = ( + active_schedule.working_timeout + + app.config["ALERT_REPORTS_WORKING_TIME_OUT_LAG"] + ) + async_options["soft_time_limit"] = ( + active_schedule.working_timeout + + app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"] + ) + execute.apply_async((active_schedule.id, schedule,), **async_options) @celery_app.task(name="reports.execute") diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index fee74e5b7439d..512653ef1538c 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -100,9 +100,9 @@ def assert_log(state: str, error_message: Optional[str] = None): db.session.commit() logs = db.session.query(ReportExecutionLog).all() if state == ReportState.WORKING: - assert len(logs) == 1 - assert logs[0].error_message == error_message - assert logs[0].state == state + assert len(logs) == 2 + assert logs[1].error_message == error_message + assert logs[1].state == state return # On error we send an email if state == ReportState.ERROR: @@ -232,6 +232,17 @@ def create_report_slack_chart_working(): report_schedule.last_state = ReportState.WORKING report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0) db.session.commit() + log = ReportExecutionLog( + scheduled_dttm=report_schedule.last_eval_dttm, + start_dttm=report_schedule.last_eval_dttm, + end_dttm=report_schedule.last_eval_dttm, + state=ReportState.WORKING, + report_schedule=report_schedule, + uuid=uuid4(), + ) + db.session.add(log) + db.session.commit() + yield report_schedule cleanup_report_schedule(report_schedule) @@ -638,12 +649,11 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working): """ ExecuteReport Command: Test report schedule still working but should timed out """ - # setup screenshot mock current_time = create_report_slack_chart_working.last_eval_dttm + timedelta( seconds=create_report_slack_chart_working.working_timeout + 1 ) - with freeze_time(current_time): + with pytest.raises(ReportScheduleWorkingTimeoutError): AsyncExecuteReportScheduleCommand( test_id, create_report_slack_chart_working.id, datetime.utcnow() @@ -652,9 +662,10 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working): # Only needed for MySQL, understand why db.session.commit() logs = db.session.query(ReportExecutionLog).all() - assert len(logs) == 1 - assert logs[0].error_message == ReportScheduleWorkingTimeoutError.message - assert logs[0].state == ReportState.ERROR + # Two logs, first is created by fixture + assert len(logs) == 2 + assert logs[1].error_message == ReportScheduleWorkingTimeoutError.message + assert logs[1].state == ReportState.ERROR assert create_report_slack_chart_working.last_state == ReportState.ERROR diff --git a/tests/reports/scheduler_tests.py b/tests/reports/scheduler_tests.py new file mode 100644 index 0000000000000..afbaeec9056a4 --- /dev/null +++ b/tests/reports/scheduler_tests.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import List +from unittest.mock import patch + +import pytest +from freezegun import freeze_time +from freezegun.api import FakeDatetime # type: ignore + +from superset.extensions import db +from superset.models.reports import ReportScheduleType +from superset.tasks.scheduler import cron_schedule_window, scheduler +from tests.reports.utils import insert_report_schedule +from tests.test_app import app + + +@pytest.mark.parametrize( + "current_dttm, cron, excepted", + [ + ("2020-01-01T08:59:01Z", "0 9 * * *", []), + ("2020-01-01T08:59:02Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]), + ("2020-01-01T08:59:59Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]), + ("2020-01-01T09:00:00Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]), + ("2020-01-01T09:00:01Z", "0 9 * * *", []), + ], +) +def test_cron_schedule_window( + current_dttm: str, cron: str, excepted: List[FakeDatetime] +): + """ + Reports scheduler: Test cron schedule window + """ + with app.app_context(): + + with freeze_time(current_dttm): + datetimes = cron_schedule_window(cron) + assert list(datetimes) == excepted + + +@patch("superset.tasks.scheduler.execute.apply_async") +def test_scheduler_celery_timeout(execute_mock): + """ + Reports scheduler: Test scheduler setting celery soft and hard timeout + """ + with app.app_context(): + + report_schedule = insert_report_schedule( + type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *", + ) + + with freeze_time("2020-01-01T09:00:00Z"): + scheduler() + assert execute_mock.call_args[1]["soft_time_limit"] == 3601 + assert execute_mock.call_args[1]["time_limit"] == 3610 + db.session.delete(report_schedule) + db.session.commit() + + +@patch("superset.tasks.scheduler.execute.apply_async") +def test_scheduler_celery_no_timeout(execute_mock): + """ + Reports scheduler: Test scheduler setting celery soft and hard timeout + """ + with app.app_context(): + app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] = False + report_schedule = insert_report_schedule( + type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *", + ) + + with freeze_time("2020-01-01T09:00:00Z"): + scheduler() + assert execute_mock.call_args[1] == {"eta": FakeDatetime(2020, 1, 1, 9, 0)} + db.session.delete(report_schedule) + db.session.commit() diff --git a/tests/superset_test_config.py b/tests/superset_test_config.py index 651ee35959020..0ce8909a817ec 100644 --- a/tests/superset_test_config.py +++ b/tests/superset_test_config.py @@ -97,6 +97,8 @@ def GET_FEATURE_FLAGS_FUNC(ff): GLOBAL_ASYNC_QUERIES_JWT_SECRET = "test-secret-change-me-test-secret-change-me" +ALERT_REPORTS_WORKING_TIME_OUT_KILL = True + class CeleryConfig(object): BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_CELERY_DB}"