From 3689156e26318d1bd76702c9e87b2e2562858c8a Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 1 Apr 2021 12:47:46 +0100 Subject: [PATCH 1/9] fix: working timeout with celery kill and logic fix --- superset/reports/commands/execute.py | 7 ++++++- superset/reports/dao.py | 19 ++++++++++++++++++ superset/tasks/scheduler.py | 8 +++++++- tests/reports/commands_tests.py | 29 +++++++++++++++++++--------- 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index c3a3d6765f0d9..e3352d241f95b 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -280,12 +280,17 @@ def is_on_working_timeout(self) -> bool: """ Checks if an alert is on a working timeout """ + last_working = ReportScheduleDAO.find_last_entered_working_log( + self._report_schedule, session=self._session + ) + if not last_working: + return False return ( self._report_schedule.working_timeout is not None and self._report_schedule.last_eval_dttm is not None and datetime.utcnow() - timedelta(seconds=self._report_schedule.working_timeout) - > self._report_schedule.last_eval_dttm + > last_working.end_dttm ) def next(self) -> None: diff --git a/superset/reports/dao.py b/superset/reports/dao.py index 5ffa0f0f10aa2..30fd68eec2771 100644 --- a/superset/reports/dao.py +++ b/superset/reports/dao.py @@ -226,6 +226,25 @@ def find_last_success_log( .first() ) + @staticmethod + def find_last_entered_working_log( + report_schedule: ReportSchedule, session: Optional[Session] = None, + ) -> Optional[ReportExecutionLog]: + """ + Finds last success execution log for a given report + """ + session = session or db.session + return ( + session.query(ReportExecutionLog) + .filter( + ReportExecutionLog.state == ReportState.WORKING, + ReportExecutionLog.report_schedule == report_schedule, + ReportExecutionLog.error_message == None, + ) + .order_by(ReportExecutionLog.end_dttm.desc()) + .first() + ) + @staticmethod def find_last_error_notification( report_schedule: ReportSchedule, session: Optional[Session] = None, diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 73005e77548e3..4874cf037d174 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -57,7 +57,13 @@ def scheduler() -> None: logger.info( "Scheduling alert %s eta: %s", active_schedule.name, schedule ) - execute.apply_async((active_schedule.id, schedule,), eta=schedule) + async_options = {"eta": schedule} + if active_schedule.working_timeout is not None: + async_options["time_limit"] = active_schedule.working_timeout + 10 + async_options["soft_time_limit"] = ( + active_schedule.working_timeout + 1 + ) + execute.apply_async((active_schedule.id, schedule,), **async_options) @celery_app.task(name="reports.execute") diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index fee74e5b7439d..5f91202badb16 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -100,9 +100,9 @@ def assert_log(state: str, error_message: Optional[str] = None): db.session.commit() logs = db.session.query(ReportExecutionLog).all() if state == ReportState.WORKING: - assert len(logs) == 1 - assert logs[0].error_message == error_message - assert logs[0].state == state + assert len(logs) == 2 + assert logs[1].error_message == error_message + assert logs[1].state == state return # On error we send an email if state == ReportState.ERROR: @@ -232,6 +232,17 @@ def create_report_slack_chart_working(): report_schedule.last_state = ReportState.WORKING report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0) db.session.commit() + log = ReportExecutionLog( + scheduled_dttm=report_schedule.last_eval_dttm, + start_dttm=report_schedule.last_eval_dttm, + end_dttm=report_schedule.last_eval_dttm, + state=ReportState.WORKING, + report_schedule=report_schedule, + uuid=uuid4(), + ) + db.session.add(log) + db.session.commit() + yield report_schedule cleanup_report_schedule(report_schedule) @@ -638,12 +649,11 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working): """ ExecuteReport Command: Test report schedule still working but should timed out """ - # setup screenshot mock current_time = create_report_slack_chart_working.last_eval_dttm + timedelta( - seconds=create_report_slack_chart_working.working_timeout + 1 + seconds=create_report_slack_chart_working.working_timeout + 1000 ) - with freeze_time(current_time): + with pytest.raises(ReportScheduleWorkingTimeoutError): AsyncExecuteReportScheduleCommand( test_id, create_report_slack_chart_working.id, datetime.utcnow() @@ -652,9 +662,10 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working): # Only needed for MySQL, understand why db.session.commit() logs = db.session.query(ReportExecutionLog).all() - assert len(logs) == 1 - assert logs[0].error_message == ReportScheduleWorkingTimeoutError.message - assert logs[0].state == ReportState.ERROR + # Two logs, first is created by fixture + assert len(logs) == 2 + assert logs[1].error_message == ReportScheduleWorkingTimeoutError.message + assert logs[1].state == ReportState.ERROR assert create_report_slack_chart_working.last_state == ReportState.ERROR From 041372141406e31d7c497b24d9e2bab38162a321 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 1 Apr 2021 13:32:08 +0100 Subject: [PATCH 2/9] add config flags --- superset/config.py | 8 ++++++++ superset/tasks/scheduler.py | 13 ++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/superset/config.py b/superset/config.py index 0976e46c4efae..c5d5a6ea08b63 100644 --- a/superset/config.py +++ b/superset/config.py @@ -922,6 +922,14 @@ class CeleryConfig: # pylint: disable=too-few-public-methods # Used for Alerts/Reports (Feature flask ALERT_REPORTS) to set the size for the # sliding cron window size, should be synced with the celery beat config minus 1 second ALERT_REPORTS_CRON_WINDOW_SIZE = 59 +ALERT_REPORTS_WORKING_TIMOUT_KILL = True +# if ALERT_REPORTS_WORKING_TIMOUT_KILL is True, set a celery hard timeout +# Equal to working timeout + ALERT_REPORTS_WORKING_TIME_OUT_LAG +ALERT_REPORTS_WORKING_TIME_OUT_LAG = 10 +# if ALERT_REPORTS_WORKING_TIMOUT_KILL is True, set a celery hard timeout +# Equal to working timeout + ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG +ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG = 1 + # A custom prefix to use on all Alerts & Reports emails EMAIL_REPORTS_SUBJECT_PREFIX = "[Report] " diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 4874cf037d174..c6eee03b74294 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -58,10 +58,17 @@ def scheduler() -> None: "Scheduling alert %s eta: %s", active_schedule.name, schedule ) async_options = {"eta": schedule} - if active_schedule.working_timeout is not None: - async_options["time_limit"] = active_schedule.working_timeout + 10 + if ( + active_schedule.working_timeout is not None + and app.config["ALERT_REPORTS_WORKING_TIMOUT_KILL"] + ): + async_options["time_limit"] = ( + active_schedule.working_timeout + + app.config["ALERT_REPORTS_WORKING_TIME_OUT_LAG"] + ) async_options["soft_time_limit"] = ( - active_schedule.working_timeout + 1 + active_schedule.working_timeout + + app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"] ) execute.apply_async((active_schedule.id, schedule,), **async_options) From bc665a9d864e9a88ea1873a553a6a7f0a531dfa2 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 1 Apr 2021 14:06:25 +0100 Subject: [PATCH 3/9] fix typo --- superset/config.py | 6 +++--- superset/tasks/scheduler.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/superset/config.py b/superset/config.py index c5d5a6ea08b63..cb9b7f65162ed 100644 --- a/superset/config.py +++ b/superset/config.py @@ -922,11 +922,11 @@ class CeleryConfig: # pylint: disable=too-few-public-methods # Used for Alerts/Reports (Feature flask ALERT_REPORTS) to set the size for the # sliding cron window size, should be synced with the celery beat config minus 1 second ALERT_REPORTS_CRON_WINDOW_SIZE = 59 -ALERT_REPORTS_WORKING_TIMOUT_KILL = True -# if ALERT_REPORTS_WORKING_TIMOUT_KILL is True, set a celery hard timeout +ALERT_REPORTS_WORKING_TIME_OUT_KILL = True +# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout # Equal to working timeout + ALERT_REPORTS_WORKING_TIME_OUT_LAG ALERT_REPORTS_WORKING_TIME_OUT_LAG = 10 -# if ALERT_REPORTS_WORKING_TIMOUT_KILL is True, set a celery hard timeout +# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout # Equal to working timeout + ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG = 1 diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index c6eee03b74294..84026aefb40c6 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -60,7 +60,7 @@ def scheduler() -> None: async_options = {"eta": schedule} if ( active_schedule.working_timeout is not None - and app.config["ALERT_REPORTS_WORKING_TIMOUT_KILL"] + and app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] ): async_options["time_limit"] = ( active_schedule.working_timeout From 76217a7dd2deb049e84ce116c6208d90e2591133 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 1 Apr 2021 15:43:06 +0100 Subject: [PATCH 4/9] fix python lint --- superset/reports/dao.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/superset/reports/dao.py b/superset/reports/dao.py index 30fd68eec2771..0f729c3d022d3 100644 --- a/superset/reports/dao.py +++ b/superset/reports/dao.py @@ -239,7 +239,7 @@ def find_last_entered_working_log( .filter( ReportExecutionLog.state == ReportState.WORKING, ReportExecutionLog.report_schedule == report_schedule, - ReportExecutionLog.error_message == None, + ReportExecutionLog.error_message.is_(None), ) .order_by(ReportExecutionLog.end_dttm.desc()) .first() From aa2438d439c154055491c0bfe9fb908e790f25b2 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Thu, 1 Apr 2021 15:51:47 +0100 Subject: [PATCH 5/9] remove test --- tests/reports/commands_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index 5f91202badb16..512653ef1538c 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -650,7 +650,7 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working): ExecuteReport Command: Test report schedule still working but should timed out """ current_time = create_report_slack_chart_working.last_eval_dttm + timedelta( - seconds=create_report_slack_chart_working.working_timeout + 1000 + seconds=create_report_slack_chart_working.working_timeout + 1 ) with freeze_time(current_time): From b404781dd38cfb43d3946f48d71e0ac541914be6 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Mon, 5 Apr 2021 14:25:43 +0100 Subject: [PATCH 6/9] log query time for alerts --- superset/reports/commands/alert.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index 076ebfb40973d..469e85c189fc5 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -17,6 +17,7 @@ import json import logging from operator import eq, ge, gt, le, lt, ne +from timeit import default_timer from typing import Optional import numpy as np @@ -145,7 +146,15 @@ def _execute_query(self) -> pd.DataFrame: limited_rendered_sql = self._report_schedule.database.apply_limit_to_sql( rendered_sql, ALERT_SQL_LIMIT ) - return self._report_schedule.database.get_df(limited_rendered_sql) + start = default_timer() + df = self._report_schedule.database.get_df(limited_rendered_sql) + stop = default_timer() + logger.info( + "Query for %s took %.2f ms", + self._report_schedule.name, + (stop - start) * 1000.0, + ) + return df except SoftTimeLimitExceeded: raise AlertQueryTimeout() except Exception as ex: From 6dfafd5818f21e5f0a7ba77a2489f60e19f94912 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Wed, 7 Apr 2021 10:24:45 +0100 Subject: [PATCH 7/9] add tests --- tests/reports/scheduler_tests.py | 70 ++++++++++++++++++++++++++++++++ tests/superset_test_config.py | 2 + 2 files changed, 72 insertions(+) create mode 100644 tests/reports/scheduler_tests.py diff --git a/tests/reports/scheduler_tests.py b/tests/reports/scheduler_tests.py new file mode 100644 index 0000000000000..52d2a09d66d75 --- /dev/null +++ b/tests/reports/scheduler_tests.py @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import List +from unittest.mock import patch + +import pytest +from freezegun import freeze_time +from freezegun.api import FakeDatetime # type: ignore + +from superset.extensions import db +from superset.models.reports import ReportScheduleType +from superset.tasks.scheduler import cron_schedule_window, scheduler +from tests.reports.utils import insert_report_schedule +from tests.test_app import app + + +@pytest.mark.parametrize( + "current_dttm, cron, excepted", + [ + ("2020-01-01T08:59:01Z", "0 9 * * *", []), + ("2020-01-01T08:59:02Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]), + ("2020-01-01T08:59:59Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]), + ("2020-01-01T09:00:00Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]), + ("2020-01-01T09:00:01Z", "0 9 * * *", []), + ], +) +def test_cron_schedule_window( + current_dttm: str, cron: str, excepted: List[FakeDatetime] +): + """ + Reports scheduler: Test cron schedule window + """ + with app.app_context(): + + with freeze_time(current_dttm): + datetimes = cron_schedule_window(cron) + assert list(datetimes) == excepted + + +@patch("superset.tasks.scheduler.execute.apply_async") +def test_scheduler_celery_timeout(execute_mock): + """ + Reports scheduler: Test scheduler setting celery soft and hard timeout + """ + with app.app_context(): + + report_schedule = insert_report_schedule( + type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *", + ) + + with freeze_time("2020-01-01T09:00:00Z"): + scheduler() + assert execute_mock.call_args[1]["soft_time_limit"] == 3601 + assert execute_mock.call_args[1]["time_limit"] == 3610 + db.session.delete(report_schedule) + db.session.commit() diff --git a/tests/superset_test_config.py b/tests/superset_test_config.py index 651ee35959020..0ce8909a817ec 100644 --- a/tests/superset_test_config.py +++ b/tests/superset_test_config.py @@ -97,6 +97,8 @@ def GET_FEATURE_FLAGS_FUNC(ff): GLOBAL_ASYNC_QUERIES_JWT_SECRET = "test-secret-change-me-test-secret-change-me" +ALERT_REPORTS_WORKING_TIME_OUT_KILL = True + class CeleryConfig(object): BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_CELERY_DB}" From bc9495526ad2f5db48a2eb082acca5eadd290a12 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Wed, 7 Apr 2021 14:48:32 +0100 Subject: [PATCH 8/9] add test --- tests/reports/scheduler_tests.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/reports/scheduler_tests.py b/tests/reports/scheduler_tests.py index 52d2a09d66d75..30908662fe1f1 100644 --- a/tests/reports/scheduler_tests.py +++ b/tests/reports/scheduler_tests.py @@ -68,3 +68,21 @@ def test_scheduler_celery_timeout(execute_mock): assert execute_mock.call_args[1]["time_limit"] == 3610 db.session.delete(report_schedule) db.session.commit() + + +@patch("superset.tasks.scheduler.execute.apply_async") +def test_scheduler_celery_no_timeout(execute_mock): + """ + Reports scheduler: Test scheduler setting celery soft and hard timeout + """ + with app.app_context(): + app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] = False + report_schedule = insert_report_schedule( + type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *", + ) + + with freeze_time("2020-01-01T09:00:00Z"): + scheduler() + assert execute_mock.call_args[1] == {'eta': FakeDatetime(2020, 1, 1, 9, 0)} + db.session.delete(report_schedule) + db.session.commit() From cb8a29fb41262227b009d5d7f05a281d97e289b8 Mon Sep 17 00:00:00 2001 From: Daniel Gaspar Date: Wed, 7 Apr 2021 15:33:27 +0100 Subject: [PATCH 9/9] fix lint --- tests/reports/scheduler_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/reports/scheduler_tests.py b/tests/reports/scheduler_tests.py index 30908662fe1f1..afbaeec9056a4 100644 --- a/tests/reports/scheduler_tests.py +++ b/tests/reports/scheduler_tests.py @@ -83,6 +83,6 @@ def test_scheduler_celery_no_timeout(execute_mock): with freeze_time("2020-01-01T09:00:00Z"): scheduler() - assert execute_mock.call_args[1] == {'eta': FakeDatetime(2020, 1, 1, 9, 0)} + assert execute_mock.call_args[1] == {"eta": FakeDatetime(2020, 1, 1, 9, 0)} db.session.delete(report_schedule) db.session.commit()