Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: working timeout with celery kill and logic fix #13911

Merged
merged 9 commits into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,14 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
# Used for Alerts/Reports (Feature flask ALERT_REPORTS) to set the size for the
# sliding cron window size, should be synced with the celery beat config minus 1 second
ALERT_REPORTS_CRON_WINDOW_SIZE = 59
ALERT_REPORTS_WORKING_TIME_OUT_KILL = True
# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout
# Equal to working timeout + ALERT_REPORTS_WORKING_TIME_OUT_LAG
ALERT_REPORTS_WORKING_TIME_OUT_LAG = 10
# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout
# Equal to working timeout + ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG
ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG = 1

# A custom prefix to use on all Alerts & Reports emails
EMAIL_REPORTS_SUBJECT_PREFIX = "[Report] "

Expand Down
11 changes: 10 additions & 1 deletion superset/reports/commands/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import json
import logging
from operator import eq, ge, gt, le, lt, ne
from timeit import default_timer
from typing import Optional

import numpy as np
Expand Down Expand Up @@ -145,7 +146,15 @@ def _execute_query(self) -> pd.DataFrame:
limited_rendered_sql = self._report_schedule.database.apply_limit_to_sql(
rendered_sql, ALERT_SQL_LIMIT
)
return self._report_schedule.database.get_df(limited_rendered_sql)
start = default_timer()
df = self._report_schedule.database.get_df(limited_rendered_sql)
stop = default_timer()
logger.info(
"Query for %s took %.2f ms",
self._report_schedule.name,
(stop - start) * 1000.0,
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a log info with the time a query took to execute

return df
except SoftTimeLimitExceeded:
raise AlertQueryTimeout()
except Exception as ex:
Expand Down
7 changes: 6 additions & 1 deletion superset/reports/commands/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,17 @@ def is_on_working_timeout(self) -> bool:
"""
Checks if an alert is on a working timeout
"""
last_working = ReportScheduleDAO.find_last_entered_working_log(
self._report_schedule, session=self._session
)
if not last_working:
return False
return (
self._report_schedule.working_timeout is not None
and self._report_schedule.last_eval_dttm is not None
and datetime.utcnow()
- timedelta(seconds=self._report_schedule.working_timeout)
> self._report_schedule.last_eval_dttm
> last_working.end_dttm
)

def next(self) -> None:
Expand Down
19 changes: 19 additions & 0 deletions superset/reports/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,25 @@ def find_last_success_log(
.first()
)

@staticmethod
def find_last_entered_working_log(
report_schedule: ReportSchedule, session: Optional[Session] = None,
) -> Optional[ReportExecutionLog]:
"""
Finds last success execution log for a given report
"""
session = session or db.session
return (
session.query(ReportExecutionLog)
.filter(
ReportExecutionLog.state == ReportState.WORKING,
ReportExecutionLog.report_schedule == report_schedule,
ReportExecutionLog.error_message.is_(None),
)
.order_by(ReportExecutionLog.end_dttm.desc())
.first()
)

@staticmethod
def find_last_error_notification(
report_schedule: ReportSchedule, session: Optional[Session] = None,
Expand Down
15 changes: 14 additions & 1 deletion superset/tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,20 @@ def scheduler() -> None:
logger.info(
"Scheduling alert %s eta: %s", active_schedule.name, schedule
)
execute.apply_async((active_schedule.id, schedule,), eta=schedule)
async_options = {"eta": schedule}
if (
active_schedule.working_timeout is not None
and app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"]
):
async_options["time_limit"] = (
active_schedule.working_timeout
+ app.config["ALERT_REPORTS_WORKING_TIME_OUT_LAG"]
)
async_options["soft_time_limit"] = (
active_schedule.working_timeout
+ app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"]
)
execute.apply_async((active_schedule.id, schedule,), **async_options)


@celery_app.task(name="reports.execute")
Expand Down
27 changes: 19 additions & 8 deletions tests/reports/commands_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ def assert_log(state: str, error_message: Optional[str] = None):
db.session.commit()
logs = db.session.query(ReportExecutionLog).all()
if state == ReportState.WORKING:
assert len(logs) == 1
assert logs[0].error_message == error_message
assert logs[0].state == state
assert len(logs) == 2
assert logs[1].error_message == error_message
assert logs[1].state == state
return
# On error we send an email
if state == ReportState.ERROR:
Expand Down Expand Up @@ -232,6 +232,17 @@ def create_report_slack_chart_working():
report_schedule.last_state = ReportState.WORKING
report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0)
db.session.commit()
log = ReportExecutionLog(
scheduled_dttm=report_schedule.last_eval_dttm,
start_dttm=report_schedule.last_eval_dttm,
end_dttm=report_schedule.last_eval_dttm,
state=ReportState.WORKING,
report_schedule=report_schedule,
uuid=uuid4(),
)
db.session.add(log)
db.session.commit()

yield report_schedule

cleanup_report_schedule(report_schedule)
Expand Down Expand Up @@ -638,12 +649,11 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working):
"""
ExecuteReport Command: Test report schedule still working but should timed out
"""
# setup screenshot mock
current_time = create_report_slack_chart_working.last_eval_dttm + timedelta(
seconds=create_report_slack_chart_working.working_timeout + 1
)

with freeze_time(current_time):

with pytest.raises(ReportScheduleWorkingTimeoutError):
AsyncExecuteReportScheduleCommand(
test_id, create_report_slack_chart_working.id, datetime.utcnow()
Expand All @@ -652,9 +662,10 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working):
# Only needed for MySQL, understand why
db.session.commit()
logs = db.session.query(ReportExecutionLog).all()
assert len(logs) == 1
assert logs[0].error_message == ReportScheduleWorkingTimeoutError.message
assert logs[0].state == ReportState.ERROR
# Two logs, first is created by fixture
assert len(logs) == 2
assert logs[1].error_message == ReportScheduleWorkingTimeoutError.message
assert logs[1].state == ReportState.ERROR

assert create_report_slack_chart_working.last_state == ReportState.ERROR

Expand Down
88 changes: 88 additions & 0 deletions tests/reports/scheduler_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List
from unittest.mock import patch

import pytest
from freezegun import freeze_time
from freezegun.api import FakeDatetime # type: ignore

from superset.extensions import db
from superset.models.reports import ReportScheduleType
from superset.tasks.scheduler import cron_schedule_window, scheduler
from tests.reports.utils import insert_report_schedule
from tests.test_app import app


@pytest.mark.parametrize(
"current_dttm, cron, excepted",
[
("2020-01-01T08:59:01Z", "0 9 * * *", []),
("2020-01-01T08:59:02Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T08:59:59Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T09:00:00Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T09:00:01Z", "0 9 * * *", []),
],
)
def test_cron_schedule_window(
current_dttm: str, cron: str, excepted: List[FakeDatetime]
):
"""
Reports scheduler: Test cron schedule window
"""
with app.app_context():

with freeze_time(current_dttm):
datetimes = cron_schedule_window(cron)
assert list(datetimes) == excepted


@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_timeout(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():

report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *",
)

with freeze_time("2020-01-01T09:00:00Z"):
scheduler()
assert execute_mock.call_args[1]["soft_time_limit"] == 3601
assert execute_mock.call_args[1]["time_limit"] == 3610
db.session.delete(report_schedule)
db.session.commit()


@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_no_timeout(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():
app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] = False
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *",
)

with freeze_time("2020-01-01T09:00:00Z"):
scheduler()
assert execute_mock.call_args[1] == {"eta": FakeDatetime(2020, 1, 1, 9, 0)}
db.session.delete(report_schedule)
db.session.commit()
2 changes: 2 additions & 0 deletions tests/superset_test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ def GET_FEATURE_FLAGS_FUNC(ff):

GLOBAL_ASYNC_QUERIES_JWT_SECRET = "test-secret-change-me-test-secret-change-me"

ALERT_REPORTS_WORKING_TIME_OUT_KILL = True


class CeleryConfig(object):
BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_CELERY_DB}"
Expand Down