Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Migrate /superset/stop_query/ to API v1 #22624

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,258 changes: 833 additions & 425 deletions docs/static/resources/openapi.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions superset-frontend/src/SqlLab/actions/sqlLab.js
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,9 @@ export function validateQuery(queryEditor, sql) {
export function postStopQuery(query) {
return function (dispatch) {
return SupersetClient.post({
endpoint: '/superset/stop_query/',
postPayload: { client_id: query.id },
stringify: false,
endpoint: '/api/v1/query/stop',
body: JSON.stringify({ client_id: query.id }),
headers: { 'Content-Type': 'application/json' },
})
.then(() => dispatch(stopQuery(query)))
.then(() => dispatch(addSuccessToast(t('Query was stopped.'))))
Expand Down
11 changes: 8 additions & 3 deletions superset-frontend/src/SqlLab/actions/sqlLab.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,15 @@ describe('async actions', () => {
});

describe('postStopQuery', () => {
const stopQueryEndpoint = 'glob:*/superset/stop_query/*';
const stopQueryEndpoint = 'glob:*/api/v1/query/stop';
fetchMock.post(stopQueryEndpoint, {});
const baseQuery = {
...query,
id: 'test_foo',
};

const makeRequest = () => {
const request = actions.postStopQuery(query);
const request = actions.postStopQuery(baseQuery);
return request(dispatch);
};

Expand All @@ -346,7 +350,8 @@ describe('async actions', () => {

return makeRequest().then(() => {
const call = fetchMock.calls(stopQueryEndpoint)[0];
expect(call[1].body.get('client_id')).toBe(query.id);
const body = JSON.parse(call[1].body);
expect(body.client_id).toBe(baseQuery.id);
});
});
});
Expand Down
1 change: 1 addition & 0 deletions superset/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class RouteMethod: # pylint: disable=too-few-public-methods
"get_data": "read",
"samples": "read",
"delete_ssh_tunnel": "write",
"stop_query": "read",
}

EXTRA_FORM_DATA_APPEND_KEYS = {
Expand Down
2 changes: 1 addition & 1 deletion superset/models/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ def get_sqla_col(self, col: Dict[str, Any]) -> Column:
col = sa.column(label, type_=col_type)
return self.make_sqla_column_compatible(col, label)

def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument
def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements
self,
apply_fetch_values_predicate: bool = False,
columns: Optional[List[Column]] = None,
Expand Down
74 changes: 72 additions & 2 deletions superset/queries/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,28 @@
# under the License.
import logging

import backoff
from flask_appbuilder.api import expose, protect, request, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface

from superset import db, event_logger
from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod
from superset.databases.filters import DatabaseFilter
from superset.models.sql_lab import Query
from superset.queries.dao import QueryDAO
from superset.queries.filters import QueryFilter
from superset.queries.schemas import openapi_spec_methods_override, QuerySchema
from superset.views.base_api import BaseSupersetModelRestApi, RelatedFieldFilter
from superset.queries.schemas import (
openapi_spec_methods_override,
QuerySchema,
StopQuerySchema,
)
from superset.superset_typing import FlaskResponse
from superset.views.base_api import (
BaseSupersetModelRestApi,
RelatedFieldFilter,
requires_json,
statsd_metrics,
)
from superset.views.filters import BaseFilterRelatedUsers, FilterRelatedOwners

logger = logging.getLogger(__name__)
Expand All @@ -43,6 +57,7 @@ class QueryRestApi(BaseSupersetModelRestApi):
RouteMethod.GET_LIST,
RouteMethod.RELATED,
RouteMethod.DISTINCT,
"stop_query",
}

list_columns = [
Expand Down Expand Up @@ -95,9 +110,11 @@ class QueryRestApi(BaseSupersetModelRestApi):
base_filters = [["id", QueryFilter, lambda: []]]
base_order = ("changed_on", "desc")
list_model_schema = QuerySchema()
stop_query_schema = StopQuerySchema()

openapi_spec_tag = "Queries"
openapi_spec_methods = openapi_spec_methods_override
openapi_spec_component_schemas = (StopQuerySchema,)

order_columns = [
"changed_on",
Expand All @@ -123,3 +140,56 @@ class QueryRestApi(BaseSupersetModelRestApi):
base_related_field_filters = {"database": [["id", DatabaseFilter, lambda: []]]}
allowed_rel_fields = {"database", "user"}
allowed_distinct_fields = {"status"}

@expose("/stop", methods=["POST"])
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".stop_query",
log_to_statsd=False,
)
@backoff.on_exception(
backoff.constant,
Exception,
interval=1,
on_backoff=lambda details: db.session.rollback(),
on_giveup=lambda details: db.session.rollback(),
max_tries=5,
)
@requires_json
def stop_query(self) -> FlaskResponse:
"""Manually stop a query with client_id
---
post:
summary: Manually stop a query with client_id
requestBody:
description: Stop query schema
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/StopQuerySchema'
responses:
200:
description: Query stopped
content:
application/json:
schema:
type: object
properties:
result:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
body = self.stop_query_schema.load(request.json)
QueryDAO.stop_query(body["client_id"])
return self.response(200, result="OK")
24 changes: 24 additions & 0 deletions superset/queries/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
from datetime import datetime
from typing import Any, Dict

from superset import sql_lab
from superset.common.db_query_status import QueryStatus
from superset.dao.base import BaseDAO
from superset.exceptions import SupersetCancelQueryException
from superset.extensions import db
from superset.models.sql_lab import Query, SavedQuery
from superset.queries.filters import QueryFilter
from superset.utils.dates import now_as_float

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,3 +60,23 @@ def save_metadata(query: Query, payload: Dict[str, Any]) -> None:
columns = payload.get("columns", {})
db.session.add(query)
query.set_extra_json_key("columns", columns)

@staticmethod
def stop_query(client_id: str) -> None:
query = db.session.query(Query).filter_by(client_id=client_id).one()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an invalid client_id will raise, and the API would return HTTP 500, would be nicer to respond HTTP 404

if query.status in [
QueryStatus.FAILED,
QueryStatus.SUCCESS,
QueryStatus.TIMED_OUT,
]:
logger.warning(
"Query with client_id could not be stopped: query already complete",
)
return

if not sql_lab.cancel_query(query):
raise SupersetCancelQueryException("Could not cancel query")

query.status = QueryStatus.STOPPED
query.end_time = now_as_float()
db.session.commit()
8 changes: 8 additions & 0 deletions superset/queries/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,11 @@ class Meta: # pylint: disable=too-few-public-methods
# pylint: disable=no-self-use
def get_sql_tables(self, obj: Query) -> List[Table]:
return obj.sql_tables


class StopQuerySchema(Schema):
"""
Schema for the stop_query API call.
"""

client_id = fields.String()
1 change: 1 addition & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2296,6 +2296,7 @@ def results_exec(key: str) -> FlaskResponse:
on_giveup=lambda details: db.session.rollback(),
max_tries=5,
)
@deprecated()
def stop_query(self) -> FlaskResponse:
client_id = request.form.get("client_id")
query = db.session.query(Query).filter_by(client_id=client_id).one()
Expand Down
25 changes: 25 additions & 0 deletions tests/integration_tests/queries/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# isort:skip_file
"""Unit tests for Superset"""
from datetime import datetime, timedelta
from unittest import mock
import json
import random
import string
Expand Down Expand Up @@ -392,3 +393,27 @@ def test_get_list_query_no_data_access(self):
# rollback changes
db.session.delete(query)
db.session.commit()

@mock.patch("superset.sql_lab.cancel_query")
@mock.patch("superset.views.core.db.session")
def test_stop_query(self, mock_superset_db_session, mock_sql_lab_cancel_query):
"""
Handles stop query when the DB engine spec does not
have a cancel query method.
"""
form_data = {"client_id": "foo"}
query_mock = mock.Mock()
query_mock.client_id = "foo"
query_mock.status = QueryStatus.RUNNING
self.login(username="admin")
mock_superset_db_session.query().filter_by().one().return_value = query_mock
mock_sql_lab_cancel_query.return_value = True
rv = self.client.post(
"/api/v1/query/stop",
data=json.dumps(form_data),
content_type="application/json",
)

assert rv.status_code == 200
data = json.loads(rv.data.decode("utf-8"))
assert data["result"] == "OK"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add one test for a non existent client_id

123 changes: 122 additions & 1 deletion tests/unit_tests/dao/queries_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
# specific language governing permissions and limitations
# under the License.
import json
from typing import Iterator
from typing import Any, Iterator

import pytest
from pytest_mock import MockFixture
from sqlalchemy.orm.session import Session

from superset.exceptions import SupersetCancelQueryException


def test_query_dao_save_metadata(session: Session) -> None:
from superset.models.core import Database
Expand Down Expand Up @@ -53,3 +56,121 @@ def test_query_dao_save_metadata(session: Session) -> None:
query = session.query(Query).one()
QueryDAO.save_metadata(query=query, payload={"columns": []})
assert query.extra.get("columns", None) == []


def test_query_dao_stop_query_not_running(
mocker: MockFixture, app: Any, session: Session
) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query

engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member

db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")

query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.FAILED,
)

session.add(db)
session.add(query_obj)

from superset.queries.dao import QueryDAO

QueryDAO.stop_query(query_obj.client_id)
query = session.query(Query).one()
assert query.status == QueryStatus.FAILED


def test_query_dao_stop_query_failed(
mocker: MockFixture, app: Any, session: Session
) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query

engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member

db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")

query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.RUNNING,
)

session.add(db)
session.add(query_obj)

mocker.patch("superset.sql_lab.cancel_query", return_value=False)

from superset.queries.dao import QueryDAO

with pytest.raises(SupersetCancelQueryException):
QueryDAO.stop_query(query_obj.client_id)

query = session.query(Query).one()
assert query.status == QueryStatus.RUNNING


def test_query_dao_stop_query(mocker: MockFixture, app: Any, session: Session) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query

engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member

db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")

query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.RUNNING,
)

session.add(db)
session.add(query_obj)

mocker.patch("superset.sql_lab.cancel_query", return_value=True)

from superset.queries.dao import QueryDAO

QueryDAO.stop_query(query_obj.client_id)
query = session.query(Query).one()
assert query.status == QueryStatus.STOPPED