Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ability to cancel dead queries #1159

Merged
merged 2 commits into from
Feb 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 70 additions & 12 deletions querybook/server/datasources/query_execution.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

from flask import abort, Response, redirect
from flask_login import current_user

Expand All @@ -20,11 +22,16 @@
from lib.form import validate_form
from lib.data_doc.meta import var_config_to_var_dict
from lib.data_doc.doc_types import DataDocMetaVarConfig
from const.query_execution import QueryExecutionExportStatus, QueryExecutionStatus
from const.datasources import (
RESOURCE_NOT_FOUND_STATUS_CODE,
INVALID_SEMANTIC_STATUS_CODE,
)
from const.query_execution import (
QueryExecutionExportStatus,
QueryExecutionStatus,
QUERY_EXECUTION_NAMESPACE,
StatementExecutionStatus,
)
from logic import (
query_execution as logic,
datadoc as datadoc_logic,
Expand Down Expand Up @@ -119,20 +126,71 @@ def get_query_execution(query_execution_id):

@register("/query_execution/<int:query_execution_id>/", methods=["DELETE"])
def cancel_query_execution(query_execution_id):
with DBSession() as session:
execution = logic.get_query_execution_by_id(query_execution_id, session=session)
verify_query_engine_permission(execution.engine_id, session=session)
execution_dict = execution.to_dict(True) if execution is not None else None
execution = logic.get_query_execution_by_id(query_execution_id)
api_assert(
execution is not None, f"Invalid query execution id {query_execution_id}"
)

requestor = current_user.id
api_assert(
requestor == execution_dict["uid"], "You can only cancel your own queries"
# Check if user has access to execution
verify_query_engine_permission(execution.engine_id)

# Check if the user is indeed the one who issued it
api_assert(current_user.id == execution.uid, "You can only cancel your own queries")

# Check if the execution is "RUNNING"
api_assert(
execution.status
in [
QueryExecutionStatus.INITIALIZED,
QueryExecutionStatus.RUNNING,
QueryExecutionStatus.DELIVERED,
],
"Execution is already completed",
)

def cancel_query_and_notify():
statement_executions = execution.statement_executions
if len(statement_executions) > 0:
logic.update_statement_execution(
statement_executions[-1].id,
status=StatementExecutionStatus.CANCEL,
completed_at=datetime.utcnow(),
)

execution_dict = logic.update_query_execution(
query_execution_id,
status=QueryExecutionStatus.CANCEL,
completed_at=datetime.utcnow(),
).to_dict()

socketio.emit(
"query_cancel",
execution_dict,
namespace=QUERY_EXECUTION_NAMESPACE,
room=query_execution_id,
)

if execution_dict and "task_id" in execution_dict:
task = run_query_task.AsyncResult(execution_dict["task_id"])
if task is not None:
task.abort()
if not execution.task_id:
cancel_query_and_notify()
return

task = run_query_task.AsyncResult(execution.task_id)

if task.state in (
"PENDING", # Task is unknown or haven't delivered to worker yet
"RECEIVED", # Rare case where task is received but not yet start
"RETRY", # Very unlikely case, because query normally do not retry
):

task.revoke() # last attempt to cancel it
cancel_query_and_notify()
elif task.state == "ABORTED":
# In this case, the task is already aborted, but the status is running
# We will update the DB status and do nothing about the task itself
cancel_query_and_notify()
else: # RUNNING, celery state is STARTED
# Do not update status and let the worker handle it
task.abort()


@register("/query_execution/search/", methods=["GET"])
Expand Down