From 9c840f44ac257586a6a5705d67fc008d12d0fba1 Mon Sep 17 00:00:00 2001 From: Jack Fragassi Date: Fri, 8 Sep 2023 16:01:47 -0700 Subject: [PATCH 1/7] Adjust the window --- superset/tasks/cron_util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/superset/tasks/cron_util.py b/superset/tasks/cron_util.py index 19d342ebdcf86..f76d0785ad0fb 100644 --- a/superset/tasks/cron_util.py +++ b/superset/tasks/cron_util.py @@ -40,8 +40,8 @@ def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]: utc = pytz_timezone("UTC") # convert the current time to the user's local time for comparison time_now = time_now.astimezone(tz) - start_at = time_now - timedelta(seconds=1) - stop_at = time_now + timedelta(seconds=window_size) + start_at = time_now - timedelta(seconds=window_size / 2) + stop_at = time_now + timedelta(seconds=window_size / 2) crons = croniter(cron, start_at) for schedule in crons.all_next(datetime): if schedule >= stop_at: From 34c6564f78f4e63acbc13b8301322a4b36c0fa9c Mon Sep 17 00:00:00 2001 From: Jack Fragassi Date: Tue, 12 Sep 2023 17:35:55 -0700 Subject: [PATCH 2/7] Use workaround to get time scheduler job was triggered + update tests --- superset/config.py | 3 + superset/tasks/cron_util.py | 8 +- superset/tasks/scheduler.py | 9 +- tests/unit_tests/tasks/test_cron_util.py | 145 ++++++++++++++--------- 4 files changed, 101 insertions(+), 64 deletions(-) diff --git a/superset/config.py b/superset/config.py index 18a61575783dc..ce7c375888044 100644 --- a/superset/config.py +++ b/superset/config.py @@ -914,6 +914,8 @@ class D3Format(TypedDict, total=False): [86400, "24 hours"], ] +CELERY_BEAT_SCHEDULER_EXPIRES = 60 * 60 * 24 * 7 # 1 week + # Default celery config is to use SQLA as a broker, in a production setting # you'll want to use a proper broker as specified here: # https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html @@ -942,6 +944,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods "reports.scheduler": { "task": "reports.scheduler", "schedule": crontab(minute="*", hour="*"), + "options": {"expires": CELERY_BEAT_SCHEDULER_EXPIRES}, }, "reports.prune_log": { "task": "reports.prune_log", diff --git a/superset/tasks/cron_util.py b/superset/tasks/cron_util.py index f76d0785ad0fb..f37daa22b5c0b 100644 --- a/superset/tasks/cron_util.py +++ b/superset/tasks/cron_util.py @@ -27,10 +27,10 @@ logger = logging.getLogger(__name__) -def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]: +def cron_schedule_window( + triggered_at: datetime, cron: str, timezone: str +) -> Iterator[datetime]: window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"] - # create a time-aware datetime in utc - time_now = datetime.now(tz=dt_timezone.utc) try: tz = pytz_timezone(timezone) except UnknownTimeZoneError: @@ -39,7 +39,7 @@ def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]: logger.warning("Timezone %s was invalid. Falling back to 'UTC'", timezone) utc = pytz_timezone("UTC") # convert the current time to the user's local time for comparison - time_now = time_now.astimezone(tz) + time_now = triggered_at.astimezone(tz) start_at = time_now - timedelta(seconds=window_size / 2) stop_at = time_now + timedelta(seconds=window_size / 2) crons = croniter(cron, start_at) diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 90df90ff1594f..ed82ac54b055e 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from datetime import datetime, timedelta import logging from celery import Celery @@ -47,9 +48,15 @@ def scheduler() -> None: return with session_scope(nullpool=True) as session: active_schedules = ReportScheduleDAO.find_active(session) + triggered_at = ( + datetime.fromisoformat(scheduler.request.expires) + - timedelta(seconds=app.config["CELERY_BEAT_SCHEDULER_EXPIRES"]) + if scheduler.request.expires + else datetime.utcnow() + ) for active_schedule in active_schedules: for schedule in cron_schedule_window( - active_schedule.crontab, active_schedule.timezone + triggered_at, active_schedule.crontab, active_schedule.timezone ): logger.info( "Scheduling alert %s eta: %s", active_schedule.name, schedule diff --git a/tests/unit_tests/tasks/test_cron_util.py b/tests/unit_tests/tasks/test_cron_util.py index 5bc22273f544e..56f1258e30b57 100644 --- a/tests/unit_tests/tasks/test_cron_util.py +++ b/tests/unit_tests/tasks/test_cron_util.py @@ -14,11 +14,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from datetime import datetime import pytest -import pytz -from dateutil import parser -from freezegun import freeze_time from freezegun.api import FakeDatetime from superset.tasks.cron_util import cron_schedule_window @@ -27,23 +25,28 @@ @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-01-01T08:59:01Z", "0 1 * * *", []), + ("2020-01-01T08:59:01+00:00", "0 1 * * *", []), ( - "2020-01-01T08:59:02Z", + "2020-01-01T08:59:32+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T08:59:59Z", + "2020-01-01T08:59:59+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T09:00:00Z", + "2020-01-01T09:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-01-01T09:00:01Z", "0 1 * * *", []), + ( + "2020-01-01T09:00:01+00:00", + "0 1 * * *", + [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), + ("2020-01-01T09:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_los_angeles( @@ -53,34 +56,40 @@ def test_cron_schedule_window_los_angeles( Reports scheduler: Test cron schedule window for "America/Los_Angeles" """ - with freeze_time(current_dttm): - datetimes = cron_schedule_window(cron, "America/Los_Angeles") - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "America/Los_Angeles" + ) + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + ) @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-01-01T00:59:01Z", "0 1 * * *", []), + ("2020-01-01T00:59:01+00:00", "0 1 * * *", []), + ("2020-01-01T00:59:02+00:00", "0 1 * * *", []), + ( + "2020-01-01T00:59:59+00:00", + "0 1 * * *", + [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), ( - "2020-01-01T00:59:02Z", + "2020-01-01T01:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T00:59:59Z", + "2020-01-01T01:00:01+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T01:00:00Z", + "2020-01-01T01:00:29+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-01-01T01:00:01Z", "0 1 * * *", []), + ("2020-01-01T01:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_invalid_timezone( @@ -90,35 +99,41 @@ def test_cron_schedule_window_invalid_timezone( Reports scheduler: Test cron schedule window for "invalid timezone" """ - with freeze_time(current_dttm): - datetimes = cron_schedule_window(cron, "invalid timezone") - # it should default to UTC - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "invalid timezone" + ) + # it should default to UTC + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + ) @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-01-01T05:59:01Z", "0 1 * * *", []), + ("2020-01-01T05:59:01+00:00", "0 1 * * *", []), + ("2020-01-01T05:59:02+00:00", "0 1 * * *", []), + ( + "2020-01-01T05:59:59+00:00", + "0 1 * * *", + [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), ( - "2020-01-01T05:59:02Z", + "2020-01-01T06:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T5:59:59Z", + "2020-01-01T06:00:01+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T6:00:00", + "2020-01-01T06:00:29+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-01-01T6:00:01Z", "0 1 * * *", []), + ("2020-01-01T06:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_new_york( @@ -128,34 +143,40 @@ def test_cron_schedule_window_new_york( Reports scheduler: Test cron schedule window for "America/New_York" """ - with freeze_time(current_dttm, tz_offset=0): - datetimes = cron_schedule_window(cron, "America/New_York") - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "America/New_York" + ) + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + ) @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-01-01T06:59:01Z", "0 1 * * *", []), + ("2020-01-01T06:59:01+00:00", "0 1 * * *", []), + ("2020-01-01T06:59:02+00:00", "0 1 * * *", []), + ( + "2020-01-01T06:59:59+00:00", + "0 1 * * *", + [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), ( - "2020-01-01T06:59:02Z", + "2020-01-01T07:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T06:59:59Z", + "2020-01-01T07:00:01+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-01-01T07:00:00", + "2020-01-01T07:00:29+00:00", "0 1 * * *", [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-01-01T07:00:01Z", "0 1 * * *", []), + ("2020-01-01T07:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_chicago( @@ -165,34 +186,40 @@ def test_cron_schedule_window_chicago( Reports scheduler: Test cron schedule window for "America/Chicago" """ - with freeze_time(current_dttm, tz_offset=0): - datetimes = cron_schedule_window(cron, "America/Chicago") - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "America/Chicago" + ) + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + ) @pytest.mark.parametrize( "current_dttm, cron, expected", [ - ("2020-07-01T05:59:01Z", "0 1 * * *", []), + ("2020-07-01T05:59:01+00:00", "0 1 * * *", []), + ("2020-07-01T05:59:02+00:00", "0 1 * * *", []), + ( + "2020-07-01T05:59:59+00:00", + "0 1 * * *", + [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], + ), ( - "2020-07-01T05:59:02Z", + "2020-07-01T06:00:00+00:00", "0 1 * * *", [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-07-01T05:59:59Z", + "2020-07-01T06:00:01+00:00", "0 1 * * *", [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), ( - "2020-07-01T06:00:00", + "2020-07-01T06:00:29+00:00", "0 1 * * *", [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")], ), - ("2020-07-01T06:00:01Z", "0 1 * * *", []), + ("2020-07-01T06:00:30+00:00", "0 1 * * *", []), ], ) def test_cron_schedule_window_chicago_daylight( @@ -202,9 +229,9 @@ def test_cron_schedule_window_chicago_daylight( Reports scheduler: Test cron schedule window for "America/Chicago" """ - with freeze_time(current_dttm, tz_offset=0): - datetimes = cron_schedule_window(cron, "America/Chicago") - assert ( - list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) - == expected - ) + datetimes = cron_schedule_window( + datetime.fromisoformat(current_dttm), cron, "America/Chicago" + ) + assert ( + list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected + ) From 92182942102a2bd79df87efa249a3bc038325aa0 Mon Sep 17 00:00:00 2001 From: Jack Fragassi Date: Tue, 12 Sep 2023 17:56:26 -0700 Subject: [PATCH 3/7] rm unused import --- superset/tasks/cron_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/superset/tasks/cron_util.py b/superset/tasks/cron_util.py index f37daa22b5c0b..329937fb82850 100644 --- a/superset/tasks/cron_util.py +++ b/superset/tasks/cron_util.py @@ -17,7 +17,7 @@ import logging from collections.abc import Iterator -from datetime import datetime, timedelta, timezone as dt_timezone +from datetime import datetime, timedelta from croniter import croniter from pytz import timezone as pytz_timezone, UnknownTimeZoneError From de410b76a65a0e9d3b26e99e73de6e93f762b933 Mon Sep 17 00:00:00 2001 From: Jack Fragassi Date: Tue, 12 Sep 2023 18:03:42 -0700 Subject: [PATCH 4/7] isort --- superset/tasks/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index ed82ac54b055e..c1ca6b6119e8e 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -14,8 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from datetime import datetime, timedelta import logging +from datetime import datetime, timedelta from celery import Celery from celery.exceptions import SoftTimeLimitExceeded From 46850e742fba2d3594cd3c82c367a912f8e18b01 Mon Sep 17 00:00:00 2001 From: Jack Fragassi Date: Tue, 12 Sep 2023 18:38:56 -0700 Subject: [PATCH 5/7] Improve expires config readability --- superset/config.py | 4 ++-- superset/tasks/scheduler.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/superset/config.py b/superset/config.py index ce7c375888044..80203ba78e722 100644 --- a/superset/config.py +++ b/superset/config.py @@ -914,7 +914,7 @@ class D3Format(TypedDict, total=False): [86400, "24 hours"], ] -CELERY_BEAT_SCHEDULER_EXPIRES = 60 * 60 * 24 * 7 # 1 week +CELERY_BEAT_SCHEDULER_EXPIRES = timedelta(weeks=1) # Default celery config is to use SQLA as a broker, in a production setting # you'll want to use a proper broker as specified here: @@ -944,7 +944,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods "reports.scheduler": { "task": "reports.scheduler", "schedule": crontab(minute="*", hour="*"), - "options": {"expires": CELERY_BEAT_SCHEDULER_EXPIRES}, + "options": {"expires": int(CELERY_BEAT_SCHEDULER_EXPIRES.total_seconds())}, }, "reports.prune_log": { "task": "reports.prune_log", diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index c1ca6b6119e8e..c805e6dffa14d 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -50,7 +50,7 @@ def scheduler() -> None: active_schedules = ReportScheduleDAO.find_active(session) triggered_at = ( datetime.fromisoformat(scheduler.request.expires) - - timedelta(seconds=app.config["CELERY_BEAT_SCHEDULER_EXPIRES"]) + - app.config["CELERY_BEAT_SCHEDULER_EXPIRES"] if scheduler.request.expires else datetime.utcnow() ) From 91605c84758bb28111a68dd51c359b786068be5a Mon Sep 17 00:00:00 2001 From: Jack Fragassi Date: Tue, 12 Sep 2023 18:49:18 -0700 Subject: [PATCH 6/7] rm unused import --- superset/tasks/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index c805e6dffa14d..f3cc270b86347 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. import logging -from datetime import datetime, timedelta +from datetime import datetime from celery import Celery from celery.exceptions import SoftTimeLimitExceeded From 1b429abea6fc55a6bee54fee1c29519a6ca08fbf Mon Sep 17 00:00:00 2001 From: Jack Fragassi Date: Wed, 13 Sep 2023 11:02:57 -0700 Subject: [PATCH 7/7] Add comment --- superset/config.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/superset/config.py b/superset/config.py index 80203ba78e722..1145a7693f7c4 100644 --- a/superset/config.py +++ b/superset/config.py @@ -914,6 +914,8 @@ class D3Format(TypedDict, total=False): [86400, "24 hours"], ] +# This is used as a workaround for the alerts & reports scheduler task to get the time +# celery beat triggered it, see https://github.com/celery/celery/issues/6974 for details CELERY_BEAT_SCHEDULER_EXPIRES = timedelta(weeks=1) # Default celery config is to use SQLA as a broker, in a production setting