diff --git a/superset/config.py b/superset/config.py index 18a61575783dc..1145a7693f7c4 100644 --- a/superset/config.py +++ b/superset/config.py @@ -914,6 +914,10 @@ class D3Format(TypedDict, total=False): [86400, "24 hours"], ] +# This is used as a workaround for the alerts & reports scheduler task to get the time +# celery beat triggered it, see https://github.com/celery/celery/issues/6974 for details +CELERY_BEAT_SCHEDULER_EXPIRES = timedelta(weeks=1) + # Default celery config is to use SQLA as a broker, in a production setting # you'll want to use a proper broker as specified here: # https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html @@ -942,6 +946,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods "reports.scheduler": { "task": "reports.scheduler", "schedule": crontab(minute="*", hour="*"), + "options": {"expires": int(CELERY_BEAT_SCHEDULER_EXPIRES.total_seconds())}, }, "reports.prune_log": { "task": "reports.prune_log", diff --git a/superset/tasks/cron_util.py b/superset/tasks/cron_util.py index 19d342ebdcf86..329937fb82850 100644 --- a/superset/tasks/cron_util.py +++ b/superset/tasks/cron_util.py @@ -17,7 +17,7 @@ import logging from collections.abc import Iterator -from datetime import datetime, timedelta, timezone as dt_timezone +from datetime import datetime, timedelta from croniter import croniter from pytz import timezone as pytz_timezone, UnknownTimeZoneError @@ -27,10 +27,10 @@ logger = logging.getLogger(__name__) -def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]: +def cron_schedule_window( + triggered_at: datetime, cron: str, timezone: str +) -> Iterator[datetime]: window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"] - # create a time-aware datetime in utc - time_now = datetime.now(tz=dt_timezone.utc) try: tz = pytz_timezone(timezone) except UnknownTimeZoneError: @@ -39,9 +39,9 @@ def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]: logger.warning("Timezone %s was invalid. Falling back to 'UTC'", timezone) utc = pytz_timezone("UTC") # convert the current time to the user's local time for comparison - time_now = time_now.astimezone(tz) - start_at = time_now - timedelta(seconds=1) - stop_at = time_now + timedelta(seconds=window_size) + time_now = triggered_at.astimezone(tz) + start_at = time_now - timedelta(seconds=window_size / 2) + stop_at = time_now + timedelta(seconds=window_size / 2) crons = croniter(cron, start_at) for schedule in crons.all_next(datetime): if schedule >= stop_at: diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 90df90ff1594f..f3cc270b86347 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import logging +from datetime import datetime from celery import Celery from celery.exceptions import SoftTimeLimitExceeded @@ -47,9 +48,15 @@ def scheduler() -> None: return with session_scope(nullpool=True) as session: active_schedules = ReportScheduleDAO.find_active(session) + triggered_at = ( + datetime.fromisoformat(scheduler.request.expires) + - app.config["CELERY_BEAT_SCHEDULER_EXPIRES"] + if scheduler.request.expires + else datetime.utcnow() + ) for active_schedule in active_schedules: for schedule in cron_schedule_window( - active_schedule.crontab, active_schedule.timezone + triggered_at, active_schedule.crontab, active_schedule.timezone ): logger.info( "Scheduling alert %s eta: %s", active_schedule.name, schedule diff --git a/tests/unit_tests/tasks/test_cron_util.py b/tests/unit_tests/tasks/test_cron_util.py index 5bc22273f544e..56f1258e30b57 100644 --- a/tests/unit_tests/tasks/test_cron_util.py +++ b/tests/unit_tests/tasks/test_cron_util.py @@ -14,11 +14,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from datetime import datetime import pytest -import pytz -from dateutil import parser -from freezegun import freeze_time from freezegun.api import FakeDatetime from superset.tasks.cron_util import cron_schedule_window @@ -27,23 +25,28 @@ @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-01-01T08:59:01Z", "0 1 * * *", []), + ("2020-01-01T08:59:01+00:00", "0 1 * * *", []), ( - "2020-01-01T08:59:02Z", + "2020-01-01T08:59:32+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T08:59:59Z", + "2020-01-01T08:59:59+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T09:00:00Z", + "2020-01-01T09:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-01-01T09:00:01Z", "0 1 * * *", []), + ( + "2020-01-01T09:00:01+00:00", + "0 1 * * *", + [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), + ("2020-01-01T09:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_los_angeles( @@ -53,34 +56,40 @@ def test_cron_schedule_window_los_angeles( Reports scheduler: Test cron schedule window for "America/Los_Angeles" """ - with freeze_time(current_dttm): - datetimes = cron_schedule_window(cron, "America/Los_Angeles") - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "America/Los_Angeles" + ) + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + ) @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-01-01T00:59:01Z", "0 1 * * *", []), + ("2020-01-01T00:59:01+00:00", "0 1 * * *", []), + ("2020-01-01T00:59:02+00:00", "0 1 * * *", []), + ( + "2020-01-01T00:59:59+00:00", + "0 1 * * *", + [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), ( - "2020-01-01T00:59:02Z", + "2020-01-01T01:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T00:59:59Z", + "2020-01-01T01:00:01+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T01:00:00Z", + "2020-01-01T01:00:29+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-01-01T01:00:01Z", "0 1 * * *", []), + ("2020-01-01T01:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_invalid_timezone( @@ -90,35 +99,41 @@ def test_cron_schedule_window_invalid_timezone( Reports scheduler: Test cron schedule window for "invalid timezone" """ - with freeze_time(current_dttm): - datetimes = cron_schedule_window(cron, "invalid timezone") - # it should default to UTC - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "invalid timezone" + ) + # it should default to UTC + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + ) @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-01-01T05:59:01Z", "0 1 * * *", []), + ("2020-01-01T05:59:01+00:00", "0 1 * * *", []), + ("2020-01-01T05:59:02+00:00", "0 1 * * *", []), + ( + "2020-01-01T05:59:59+00:00", + "0 1 * * *", + [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), ( - "2020-01-01T05:59:02Z", + "2020-01-01T06:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T5:59:59Z", + "2020-01-01T06:00:01+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T6:00:00", + "2020-01-01T06:00:29+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-01-01T6:00:01Z", "0 1 * * *", []), + ("2020-01-01T06:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_new_york( @@ -128,34 +143,40 @@ def test_cron_schedule_window_new_york( Reports scheduler: Test cron schedule window for "America/New_York" """ - with freeze_time(current_dttm, tz_offset=0): - datetimes = cron_schedule_window(cron, "America/New_York") - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "America/New_York" + ) + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + ) @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-01-01T06:59:01Z", "0 1 * * *", []), + ("2020-01-01T06:59:01+00:00", "0 1 * * *", []), + ("2020-01-01T06:59:02+00:00", "0 1 * * *", []), + ( + "2020-01-01T06:59:59+00:00", + "0 1 * * *", + [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), ( - "2020-01-01T06:59:02Z", + "2020-01-01T07:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T06:59:59Z", + "2020-01-01T07:00:01+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T07:00:00", + "2020-01-01T07:00:29+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-01-01T07:00:01Z", "0 1 * * *", []), + ("2020-01-01T07:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_chicago( @@ -165,34 +186,40 @@ def test_cron_schedule_window_chicago( Reports scheduler: Test cron schedule window for "America/Chicago" """ - with freeze_time(current_dttm, tz_offset=0): - datetimes = cron_schedule_window(cron, "America/Chicago") - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "America/Chicago" + ) + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + ) @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-07-01T05:59:01Z", "0 1 * * *", []), + ("2020-07-01T05:59:01+00:00", "0 1 * * *", []), + ("2020-07-01T05:59:02+00:00", "0 1 * * *", []), + ( + "2020-07-01T05:59:59+00:00", + "0 1 * * *", + [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), ( - "2020-07-01T05:59:02Z", + "2020-07-01T06:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-07-01T05:59:59Z", + "2020-07-01T06:00:01+00:00", "0 1 * * *", [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-07-01T06:00:00", + "2020-07-01T06:00:29+00:00", "0 1 * * *", [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-07-01T06:00:01Z", "0 1 * * *", []), + ("2020-07-01T06:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_chicago_daylight( @@ -202,9 +229,9 @@ def test_cron_schedule_window_chicago_daylight( Reports scheduler: Test cron schedule window for "America/Chicago" """ - with freeze_time(current_dttm, tz_offset=0): - datetimes = cron_schedule_window(cron, "America/Chicago") - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "America/Chicago" + ) + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + )