Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: Add feature to manually retry failed task #1417

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions flower/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from ..utils import tasks
from ..utils.broker import Broker
from ..utils.tasks import parse_args, parse_kwargs, make_json_serializable
from . import BaseApiHandler

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -636,3 +637,50 @@ def get(self, taskid):
response['worker'] = task.worker.hostname

self.write(response)

class TaskReapply(BaseTaskHandler):
@web.authenticated
async def post(self, taskid):
"""
Get task info and reapply the task with the same arguments.

:param taskid: ID of the task to reapply.
"""
# Get original task info
task = tasks.get_task_by_id(self.application.events, taskid)
if not task:
raise HTTPError(404, f"Unknown task '{taskid}'")

# Get task name
taskname = task.name
if not taskname:
raise HTTPError(400, "Cannot reapply task with no name")

try:
# Get the task object from registered tasks
task_obj = self.capp.tasks[taskname]
except KeyError as exc:
raise HTTPError(404, f"Unknown task '{taskname}'") from exc

# Parse args and kwargs from the original task
try:
args = parse_args(task.args)
kwargs = parse_kwargs(task.kwargs)
except Exception as exc:
logger.error("Error parsing task arguments: %s", exc)
raise HTTPError(400, f"Invalid task arguments: {str(exc)}") from exc

# Apply the task with original arguments
try:
# Ensure args and kwargs are JSON serializable
args = make_json_serializable(args)
kwargs = make_json_serializable(kwargs)

result = task_obj.apply_async(args=args, kwargs=kwargs)
response = {'task-id': result.task_id}
if self.backend_configured(result):
response.update(state=result.state)
self.write(response)
except Exception as exc:
logger.error("Error reapplying task with args=%s, kwargs=%s: %s", args, kwargs, str(exc))
raise HTTPError(500, f"Error reapplying task: {str(exc)}") from exc
33 changes: 33 additions & 0 deletions flower/static/js/flower.js
Original file line number Diff line number Diff line change
Expand Up @@ -690,4 +690,37 @@ var flower = (function () {

});

$('#task-retry').click(function () {
const $button = $(this);
const $spinner = $button.find('.spinner-border');
const taskId = $('#taskid').text();

if (!taskId) {
show_alert('Task ID is missing. Cannot proceed.', 'danger');
return;
}

// Show loading state
$button.prop('disabled', true);
$spinner.removeClass('d-none');

// Reapply the task using the reapply endpoint
$.ajax({
type: 'POST',
url: url_prefix() + '/api/task/reapply/' + taskId,
success: function (response) {
show_alert(`Task ${taskId} has been retried (new task ID: ${response['task-id']})`, 'success');
// Optionally reload the page after success
setTimeout(() => location.reload(), 1500);
},
error: function (response) {
show_alert(response.responseText || 'Failed to retry task', 'danger');
// Reset button state on error
$button.prop('disabled', false);
$spinner.addClass('d-none');
}
});
});


}(jQuery));
7 changes: 6 additions & 1 deletion flower/templates/task.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ <h2>{{ getattr(task, 'name', None) }}
<button class="btn btn-danger float-end" id="task-terminate">Terminate</button>
{% elif task.state == "RECEIVED" or task.state == "RETRY" %}
<button class="btn btn-danger float-end" id="task-revoke">Revoke</button>
{% elif task.state == "FAILURE" %}
<button class="btn btn-warning float-end" id="task-retry" data-bs-toggle="button">
<span class="spinner-border spinner-border-sm d-none" role="status" aria-hidden="true"></span>
Retry
</button>
{% end %}
</h2>
</div>
Expand Down Expand Up @@ -89,4 +94,4 @@ <h2>{{ getattr(task, 'name', None) }}
</div>
</div>
</div>
{% end %}
{% end %}
52 changes: 52 additions & 0 deletions flower/utils/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import time
import json

from .search import parse_search_terms, satisfies_search_terms

Expand Down Expand Up @@ -68,3 +69,54 @@ def get_task_by_id(events, task_id):

def as_dict(task):
return task.as_dict()

def parse_args(args):
"""
Parse and process the `args` of the task.
"""
if not args:
return []
try:
# Attempt to parse JSON
parsed_args = json.loads(args)
if isinstance(parsed_args, str) and parsed_args.startswith('(') and parsed_args.endswith(')'):
return eval(parsed_args) # Handle stringified tuples
return parsed_args
except (json.JSONDecodeError, SyntaxError):
# Fallback for stringified tuples or ellipsis
if args == '...':
return [...]
if args.startswith('(') and args.endswith(')'):
return eval(args)
return [args]

def parse_kwargs(kwargs):
"""
Parse and process the `kwargs` of the task.
"""
if not kwargs:
return {}
try:
# Attempt to parse JSON
return json.loads(kwargs)
except json.JSONDecodeError:
try:
# Fallback for stringified dictionaries
import ast
if kwargs.startswith('{') and kwargs.endswith('}'):
return ast.literal_eval(kwargs)
except (ValueError, SyntaxError):
return {}
return {}

def make_json_serializable(obj):
"""
Recursively replace non-serializable types with JSON-serializable alternatives.
"""
if isinstance(obj, list):
return [make_json_serializable(item) for item in obj]
elif isinstance(obj, dict):
return {key: make_json_serializable(value) for key, value in obj.items()}
elif obj is Ellipsis:
return None # Replace `...` with `null`
return obj # Return the object if it's already serializable