Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve the reliability of alerts & reports #25239

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,10 @@ class D3Format(TypedDict, total=False):
[86400, "24 hours"],
]

# This is used as a workaround for the alerts & reports scheduler task to get the time
# celery beat triggered it, see https://github.com/celery/celery/issues/6974 for details
CELERY_BEAT_SCHEDULER_EXPIRES = timedelta(weeks=1)
jfrag1 marked this conversation as resolved.
Show resolved Hide resolved

# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html
Expand Down Expand Up @@ -942,6 +946,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
"reports.scheduler": {
"task": "reports.scheduler",
"schedule": crontab(minute="*", hour="*"),
"options": {"expires": int(CELERY_BEAT_SCHEDULER_EXPIRES.total_seconds())},
},
"reports.prune_log": {
"task": "reports.prune_log",
Expand Down
14 changes: 7 additions & 7 deletions superset/tasks/cron_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import logging
from collections.abc import Iterator
from datetime import datetime, timedelta, timezone as dt_timezone
from datetime import datetime, timedelta

from croniter import croniter
from pytz import timezone as pytz_timezone, UnknownTimeZoneError
Expand All @@ -27,10 +27,10 @@
logger = logging.getLogger(__name__)


def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
def cron_schedule_window(
triggered_at: datetime, cron: str, timezone: str
) -> Iterator[datetime]:
window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"]
# create a time-aware datetime in utc
time_now = datetime.now(tz=dt_timezone.utc)
try:
tz = pytz_timezone(timezone)
except UnknownTimeZoneError:
Expand All @@ -39,9 +39,9 @@ def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
logger.warning("Timezone %s was invalid. Falling back to 'UTC'", timezone)
utc = pytz_timezone("UTC")
# convert the current time to the user's local time for comparison
time_now = time_now.astimezone(tz)
start_at = time_now - timedelta(seconds=1)
stop_at = time_now + timedelta(seconds=window_size)
time_now = triggered_at.astimezone(tz)
start_at = time_now - timedelta(seconds=window_size / 2)
stop_at = time_now + timedelta(seconds=window_size / 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using triggered_time which could be 2 mins ago, this stop_at(30 seconds) will be preventing the schedule from being executed right? see line 47.

I think we just need to base start_at 1 seconds before the triggered_at. Reasons being cron expression can support second level granularity. If cron job is every second and start at is 30 seconds ago, we will actually execute 30 tasks before the actual start at.

And base stop_at window_size after current_time, although I am not sure why we need that stop_at at all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the stop_at will prevent anything we want to run from running. Say the job is triggered at 12:00, but run at 12:02, and the cron says the job should run at 12:00. start_at will be ~11:59:30, and stop_at will be ~12:00:30, so it will correctly find the 12:00 schedule since it's in the window.

Also I don't believe cron supports second level granularity, at least not that I've seen in Superset. IMO for Superset's use cases, getting the reliability right is much more important than supporting that level of granularity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right. Cron is at minute level. and stop_at compares to triggered_at 😂

Comment on lines +43 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The window approach seems fundamentally brittle to me. Why can't we set start_at to the last time the task was scheduled to run instead, and keep iterating until we hit the current time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this could lead to a situation where an alert/report is triggered more times that desired. For example, if a report runs at 12:00:00, and the celery queue is backed up from 11:55:00-12:00:01, all 5-6 queued scheduler jobs could run one after another, and each would trigger the report.

With the approach on this PR, the windows in this situation would be 11:54:30-11:55:30, 11:55:30-11:56:30, etc., so it should ensure we don't miss any scheduled alerts & reports or run any more times than desired.

Let me know if I'm missing something/there's something I'm not considering

crons = croniter(cron, start_at)
for schedule in crons.all_next(datetime):
if schedule >= stop_at:
Expand Down
9 changes: 8 additions & 1 deletion superset/tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime

from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
Expand Down Expand Up @@ -47,9 +48,15 @@ def scheduler() -> None:
return
with session_scope(nullpool=True) as session:
active_schedules = ReportScheduleDAO.find_active(session)
triggered_at = (
datetime.fromisoformat(scheduler.request.expires)
- app.config["CELERY_BEAT_SCHEDULER_EXPIRES"]
Comment on lines +52 to +53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not your fault, but having to do it like this just makes me cringe.. 😄 I'm hoping someone can add request.scheduled_at to Celery 😉

Comment on lines +52 to +53

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi I've been checking this code, how is this solving the issue, it select datetime.utcnow() because datetime.fromisoformat(scheduler.request.expires) - app.config["CELERY_BEAT_SCHEDULER_EXPIRES"] = 0
Can you help me understand @jfrag1 ??

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piyushdatazip I'm not sure I fully understand what you're asking, but datetime.fromisoformat(scheduler.request.expires) is a datetime, and app.config["CELERY_BEAT_SCHEDULER_EXPIRES"] is a timedelta, so the result of this subtraction is a datetime, not 0.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jfrag1 for replying, my bad I've understood it wrongly at first instance

if scheduler.request.expires
else datetime.utcnow()
)
for active_schedule in active_schedules:
for schedule in cron_schedule_window(
active_schedule.crontab, active_schedule.timezone
triggered_at, active_schedule.crontab, active_schedule.timezone
):
logger.info(
"Scheduling alert %s eta: %s", active_schedule.name, schedule
Expand Down
145 changes: 86 additions & 59 deletions tests/unit_tests/tasks/test_cron_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime

import pytest
import pytz
from dateutil import parser
from freezegun import freeze_time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used elsewhere? If not we should remove the freezegun library.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's still used elsewhere

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love freezegun, hands off @john-bodley! 😄

from freezegun.api import FakeDatetime

from superset.tasks.cron_util import cron_schedule_window
Expand All @@ -27,23 +25,28 @@
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T08:59:01Z", "0 1 * * *", []),
("2020-01-01T08:59:01+00:00", "0 1 * * *", []),
(
"2020-01-01T08:59:02Z",
"2020-01-01T08:59:32+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T08:59:59Z",
"2020-01-01T08:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T09:00:00Z",
"2020-01-01T09:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T09:00:01Z", "0 1 * * *", []),
(
"2020-01-01T09:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T09:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_los_angeles(
Expand All @@ -53,34 +56,40 @@ def test_cron_schedule_window_los_angeles(
Reports scheduler: Test cron schedule window for "America/Los_Angeles"
"""

with freeze_time(current_dttm):
datetimes = cron_schedule_window(cron, "America/Los_Angeles")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "America/Los_Angeles"
)
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)


@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T00:59:01Z", "0 1 * * *", []),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing the tests which use the zone designator for the zero UTC offset?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're not removed, I just changed the timestamp format to ISO (zero UTC offset represented by +00:00). The format used is inconsequential to the cron_schedule_window function being tested, since it's just passed a datetime object.

("2020-01-01T00:59:01+00:00", "0 1 * * *", []),
("2020-01-01T00:59:02+00:00", "0 1 * * *", []),
(
"2020-01-01T00:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T00:59:02Z",
"2020-01-01T01:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T00:59:59Z",
"2020-01-01T01:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T01:00:00Z",
"2020-01-01T01:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T01:00:01Z", "0 1 * * *", []),
("2020-01-01T01:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_invalid_timezone(
Expand All @@ -90,35 +99,41 @@ def test_cron_schedule_window_invalid_timezone(
Reports scheduler: Test cron schedule window for "invalid timezone"
"""

with freeze_time(current_dttm):
datetimes = cron_schedule_window(cron, "invalid timezone")
# it should default to UTC
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "invalid timezone"
)
# it should default to UTC
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)


@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T05:59:01Z", "0 1 * * *", []),
("2020-01-01T05:59:01+00:00", "0 1 * * *", []),
("2020-01-01T05:59:02+00:00", "0 1 * * *", []),
(
"2020-01-01T05:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T05:59:02Z",
"2020-01-01T06:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T5:59:59Z",
"2020-01-01T06:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T6:00:00",
"2020-01-01T06:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T6:00:01Z", "0 1 * * *", []),
("2020-01-01T06:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_new_york(
Expand All @@ -128,34 +143,40 @@ def test_cron_schedule_window_new_york(
Reports scheduler: Test cron schedule window for "America/New_York"
"""

with freeze_time(current_dttm, tz_offset=0):
datetimes = cron_schedule_window(cron, "America/New_York")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "America/New_York"
)
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)


@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T06:59:01Z", "0 1 * * *", []),
("2020-01-01T06:59:01+00:00", "0 1 * * *", []),
("2020-01-01T06:59:02+00:00", "0 1 * * *", []),
(
"2020-01-01T06:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T06:59:02Z",
"2020-01-01T07:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T06:59:59Z",
"2020-01-01T07:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T07:00:00",
"2020-01-01T07:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T07:00:01Z", "0 1 * * *", []),
("2020-01-01T07:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_chicago(
Expand All @@ -165,34 +186,40 @@ def test_cron_schedule_window_chicago(
Reports scheduler: Test cron schedule window for "America/Chicago"
"""

with freeze_time(current_dttm, tz_offset=0):
datetimes = cron_schedule_window(cron, "America/Chicago")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "America/Chicago"
)
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)


@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-07-01T05:59:01Z", "0 1 * * *", []),
("2020-07-01T05:59:01+00:00", "0 1 * * *", []),
("2020-07-01T05:59:02+00:00", "0 1 * * *", []),
(
"2020-07-01T05:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-07-01T05:59:02Z",
"2020-07-01T06:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-07-01T05:59:59Z",
"2020-07-01T06:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-07-01T06:00:00",
"2020-07-01T06:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-07-01T06:00:01Z", "0 1 * * *", []),
("2020-07-01T06:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_chicago_daylight(
Expand All @@ -202,9 +229,9 @@ def test_cron_schedule_window_chicago_daylight(
Reports scheduler: Test cron schedule window for "America/Chicago"
"""

with freeze_time(current_dttm, tz_offset=0):
datetimes = cron_schedule_window(cron, "America/Chicago")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "America/Chicago"
)
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)
Loading