Skip to content

Commit

Permalink
fix(alerts/reports): working timeout with celery kill and logic fix (a…
Browse files Browse the repository at this point in the history
…pache#13911)

* fix: working timeout with celery kill and logic fix

* add config flags

* fix typo

* fix python lint

* log query time for alerts

* add tests

* fix lint
  • Loading branch information
dpgaspar authored Apr 8, 2021
1 parent 771641b commit fdcb42a
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 11 deletions.
8 changes: 8 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,14 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
# Used for Alerts/Reports (Feature flask ALERT_REPORTS) to set the size for the
# sliding cron window size, should be synced with the celery beat config minus 1 second
ALERT_REPORTS_CRON_WINDOW_SIZE = 59
ALERT_REPORTS_WORKING_TIME_OUT_KILL = True
# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout
# Equal to working timeout + ALERT_REPORTS_WORKING_TIME_OUT_LAG
ALERT_REPORTS_WORKING_TIME_OUT_LAG = 10
# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout
# Equal to working timeout + ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG
ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG = 1

# A custom prefix to use on all Alerts & Reports emails
EMAIL_REPORTS_SUBJECT_PREFIX = "[Report] "

Expand Down
11 changes: 10 additions & 1 deletion superset/reports/commands/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import json
import logging
from operator import eq, ge, gt, le, lt, ne
from timeit import default_timer
from typing import Optional

import numpy as np
Expand Down Expand Up @@ -145,7 +146,15 @@ def _execute_query(self) -> pd.DataFrame:
limited_rendered_sql = self._report_schedule.database.apply_limit_to_sql(
rendered_sql, ALERT_SQL_LIMIT
)
return self._report_schedule.database.get_df(limited_rendered_sql)
start = default_timer()
df = self._report_schedule.database.get_df(limited_rendered_sql)
stop = default_timer()
logger.info(
"Query for %s took %.2f ms",
self._report_schedule.name,
(stop - start) * 1000.0,
)
return df
except SoftTimeLimitExceeded:
raise AlertQueryTimeout()
except Exception as ex:
Expand Down
7 changes: 6 additions & 1 deletion superset/reports/commands/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,17 @@ def is_on_working_timeout(self) -> bool:
"""
Checks if an alert is on a working timeout
"""
last_working = ReportScheduleDAO.find_last_entered_working_log(
self._report_schedule, session=self._session
)
if not last_working:
return False
return (
self._report_schedule.working_timeout is not None
and self._report_schedule.last_eval_dttm is not None
and datetime.utcnow()
- timedelta(seconds=self._report_schedule.working_timeout)
> self._report_schedule.last_eval_dttm
> last_working.end_dttm
)

def next(self) -> None:
Expand Down
19 changes: 19 additions & 0 deletions superset/reports/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,25 @@ def find_last_success_log(
.first()
)

@staticmethod
def find_last_entered_working_log(
report_schedule: ReportSchedule, session: Optional[Session] = None,
) -> Optional[ReportExecutionLog]:
"""
Finds last success execution log for a given report
"""
session = session or db.session
return (
session.query(ReportExecutionLog)
.filter(
ReportExecutionLog.state == ReportState.WORKING,
ReportExecutionLog.report_schedule == report_schedule,
ReportExecutionLog.error_message.is_(None),
)
.order_by(ReportExecutionLog.end_dttm.desc())
.first()
)

@staticmethod
def find_last_error_notification(
report_schedule: ReportSchedule, session: Optional[Session] = None,
Expand Down
15 changes: 14 additions & 1 deletion superset/tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,20 @@ def scheduler() -> None:
logger.info(
"Scheduling alert %s eta: %s", active_schedule.name, schedule
)
execute.apply_async((active_schedule.id, schedule,), eta=schedule)
async_options = {"eta": schedule}
if (
active_schedule.working_timeout is not None
and app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"]
):
async_options["time_limit"] = (
active_schedule.working_timeout
+ app.config["ALERT_REPORTS_WORKING_TIME_OUT_LAG"]
)
async_options["soft_time_limit"] = (
active_schedule.working_timeout
+ app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"]
)
execute.apply_async((active_schedule.id, schedule,), **async_options)


@celery_app.task(name="reports.execute")
Expand Down
27 changes: 19 additions & 8 deletions tests/reports/commands_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ def assert_log(state: str, error_message: Optional[str] = None):
db.session.commit()
logs = db.session.query(ReportExecutionLog).all()
if state == ReportState.WORKING:
assert len(logs) == 1
assert logs[0].error_message == error_message
assert logs[0].state == state
assert len(logs) == 2
assert logs[1].error_message == error_message
assert logs[1].state == state
return
# On error we send an email
if state == ReportState.ERROR:
Expand Down Expand Up @@ -232,6 +232,17 @@ def create_report_slack_chart_working():
report_schedule.last_state = ReportState.WORKING
report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0)
db.session.commit()
log = ReportExecutionLog(
scheduled_dttm=report_schedule.last_eval_dttm,
start_dttm=report_schedule.last_eval_dttm,
end_dttm=report_schedule.last_eval_dttm,
state=ReportState.WORKING,
report_schedule=report_schedule,
uuid=uuid4(),
)
db.session.add(log)
db.session.commit()

yield report_schedule

cleanup_report_schedule(report_schedule)
Expand Down Expand Up @@ -638,12 +649,11 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working):
"""
ExecuteReport Command: Test report schedule still working but should timed out
"""
# setup screenshot mock
current_time = create_report_slack_chart_working.last_eval_dttm + timedelta(
seconds=create_report_slack_chart_working.working_timeout + 1
)

with freeze_time(current_time):

with pytest.raises(ReportScheduleWorkingTimeoutError):
AsyncExecuteReportScheduleCommand(
test_id, create_report_slack_chart_working.id, datetime.utcnow()
Expand All @@ -652,9 +662,10 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working):
# Only needed for MySQL, understand why
db.session.commit()
logs = db.session.query(ReportExecutionLog).all()
assert len(logs) == 1
assert logs[0].error_message == ReportScheduleWorkingTimeoutError.message
assert logs[0].state == ReportState.ERROR
# Two logs, first is created by fixture
assert len(logs) == 2
assert logs[1].error_message == ReportScheduleWorkingTimeoutError.message
assert logs[1].state == ReportState.ERROR

assert create_report_slack_chart_working.last_state == ReportState.ERROR

Expand Down
88 changes: 88 additions & 0 deletions tests/reports/scheduler_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List
from unittest.mock import patch

import pytest
from freezegun import freeze_time
from freezegun.api import FakeDatetime # type: ignore

from superset.extensions import db
from superset.models.reports import ReportScheduleType
from superset.tasks.scheduler import cron_schedule_window, scheduler
from tests.reports.utils import insert_report_schedule
from tests.test_app import app


@pytest.mark.parametrize(
"current_dttm, cron, excepted",
[
("2020-01-01T08:59:01Z", "0 9 * * *", []),
("2020-01-01T08:59:02Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T08:59:59Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T09:00:00Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T09:00:01Z", "0 9 * * *", []),
],
)
def test_cron_schedule_window(
current_dttm: str, cron: str, excepted: List[FakeDatetime]
):
"""
Reports scheduler: Test cron schedule window
"""
with app.app_context():

with freeze_time(current_dttm):
datetimes = cron_schedule_window(cron)
assert list(datetimes) == excepted


@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_timeout(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():

report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *",
)

with freeze_time("2020-01-01T09:00:00Z"):
scheduler()
assert execute_mock.call_args[1]["soft_time_limit"] == 3601
assert execute_mock.call_args[1]["time_limit"] == 3610
db.session.delete(report_schedule)
db.session.commit()


@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_no_timeout(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():
app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] = False
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *",
)

with freeze_time("2020-01-01T09:00:00Z"):
scheduler()
assert execute_mock.call_args[1] == {"eta": FakeDatetime(2020, 1, 1, 9, 0)}
db.session.delete(report_schedule)
db.session.commit()
2 changes: 2 additions & 0 deletions tests/superset_test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ def GET_FEATURE_FLAGS_FUNC(ff):

GLOBAL_ASYNC_QUERIES_JWT_SECRET = "test-secret-change-me-test-secret-change-me"

ALERT_REPORTS_WORKING_TIME_OUT_KILL = True


class CeleryConfig(object):
BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_CELERY_DB}"
Expand Down

0 comments on commit fdcb42a

Please sign in to comment.