Skip to content

Commit

Permalink
fix: Use Celery task ETA for alert/report schedule (#24537)
Browse files Browse the repository at this point in the history
  • Loading branch information
john-bodley authored Jun 28, 2023
1 parent 0986423 commit e402c94
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 13 deletions.
15 changes: 4 additions & 11 deletions superset/tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
from dateutil import parser

from superset import app, is_feature_enabled
from superset.commands.exceptions import CommandException
Expand Down Expand Up @@ -64,21 +63,15 @@ def scheduler() -> None:
active_schedule.working_timeout
+ app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"]
)
execute.apply_async(
(
active_schedule.id,
schedule,
),
**async_options,
)
execute.apply_async((active_schedule.id,), **async_options)


@celery_app.task(name="reports.execute", bind=True)
def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm: str) -> None:
def execute(self: Celery.task, report_schedule_id: int) -> None:
task_id = None
try:
task_id = execute.request.id
scheduled_dttm_ = parser.parse(scheduled_dttm)
scheduled_dttm = execute.request.eta
logger.info(
"Executing alert/report, task id: %s, scheduled_dttm: %s",
task_id,
Expand All @@ -87,7 +80,7 @@ def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm: str) ->
AsyncExecuteReportScheduleCommand(
task_id,
report_schedule_id,
scheduled_dttm_,
scheduled_dttm,
).run()
except ReportScheduleUnexpectedError:
logger.exception(
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/reports/scheduler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def test_execute_task(update_state_mock, command_mock, init_mock, owners):
init_mock.return_value = None
command_mock.side_effect = ReportScheduleUnexpectedError("Unexpected error")
with freeze_time("2020-01-01T09:00:00Z"):
execute(report_schedule.id, "2020-01-01T09:00:00Z")
execute(report_schedule.id)
update_state_mock.assert_called_with(state="FAILURE")

db.session.delete(report_schedule)
Expand Down Expand Up @@ -199,7 +199,7 @@ def test_execute_task_with_command_exception(
init_mock.return_value = None
command_mock.side_effect = CommandException("Unexpected error")
with freeze_time("2020-01-01T09:00:00Z"):
execute(report_schedule.id, "2020-01-01T09:00:00Z")
execute(report_schedule.id)
update_state_mock.assert_called_with(state="FAILURE")
logger_mock.exception.assert_called_with(
"A downstream exception occurred while generating a report: None. Unexpected error",
Expand Down

0 comments on commit e402c94

Please sign in to comment.