Skip to content

Commit

Permalink
feat: cancel db query on stop (#15403)
Browse files Browse the repository at this point in the history
* feat: cancel db query on stop

* fix pylint

* Add unit tests

* Do not bind multiple times

* Stop only running queries

* Postgres to cancel only the required query

* Remove extra log

* Add docstring

* Better types, docstring and naming

* Use python3 format strings

* Update superset/sql_lab.py

Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>

* Add cancel_query_on_windows_unload option to database

* Return cancel_query as bool

Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>
  • Loading branch information
koszti and betodealmeida authored Jul 13, 2021
1 parent a914e3c commit 02032ee
Show file tree
Hide file tree
Showing 16 changed files with 281 additions and 4 deletions.
13 changes: 13 additions & 0 deletions superset-frontend/src/SqlLab/components/SqlEditor.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class SqlEditor extends React.PureComponent {
WINDOW_RESIZE_THROTTLE_MS,
);

this.onBeforeUnload = this.onBeforeUnload.bind(this);
this.renderDropdown = this.renderDropdown.bind(this);
}

Expand All @@ -212,6 +213,7 @@ class SqlEditor extends React.PureComponent {
this.setState({ height: this.getSqlEditorHeight() });

window.addEventListener('resize', this.handleWindowResize);
window.addEventListener('beforeunload', this.onBeforeUnload);

// setup hotkeys
const hotkeys = this.getHotkeyConfig();
Expand All @@ -222,6 +224,7 @@ class SqlEditor extends React.PureComponent {

componentWillUnmount() {
window.removeEventListener('resize', this.handleWindowResize);
window.removeEventListener('beforeunload', this.onBeforeUnload);
}

onResizeStart() {
Expand All @@ -242,6 +245,16 @@ class SqlEditor extends React.PureComponent {
}
}

onBeforeUnload(event) {
if (
this.props.database?.extra_json?.cancel_query_on_windows_unload &&
this.props.latestQuery?.state === 'running'
) {
event.preventDefault();
this.stopQuery();
}
}

onSqlChanged(sql) {
this.setState({ sql });
this.setQueryEditorSqlWithDebounce(sql);
Expand Down
5 changes: 4 additions & 1 deletion superset-frontend/src/SqlLab/reducers/sqlLab.js
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,10 @@ export default function sqlLabReducer(state = {}, action) {
[actions.SET_DATABASES]() {
const databases = {};
action.databases.forEach(db => {
databases[db.id] = db;
databases[db.id] = {
...db,
extra_json: JSON.parse(db.extra || ''),
};
});
return { ...state, databases };
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,24 @@ const ExtraOptions = ({
/>
</div>
</StyledInputContainer>
<StyledInputContainer css={{ no_margin_bottom }}>
<div className="input-container">
<IndeterminateCheckbox
id="cancel_query_on_windows_unload"
indeterminate={false}
checked={!!db?.extra_json?.cancel_query_on_windows_unload}
onChange={onExtraInputChange}
labelText={t('Cancel query on window unload event')}
/>
<InfoTooltip
tooltip={t(
'Terminate running queries when browser window closed or navigated ' +
'to another page. Available for Presto, Hive, MySQL, Postgres and ' +
'Snowflake databases.',
)}
/>
</div>
</StyledInputContainer>
</Collapse.Panel>
<Collapse.Panel
header={
Expand Down
1 change: 1 addition & 0 deletions superset-frontend/src/views/CRUD/data/database/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export type DatabaseObject = {
}; // No field, holds schema and table timeout
allows_virtual_table_explore?: boolean; // in SQL Lab
schemas_allowed_for_csv_upload?: [] | string; // in Security
cancel_query_on_windows_unload?: boolean; // in Performance
version?: string;

// todo: ask beto where this should live
Expand Down
1 change: 1 addition & 0 deletions superset/databases/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class DatabaseRestApi(BaseSupersetModelRestApi):
"database_name",
"explore_database_id",
"expose_in_sqllab",
"extra",
"force_ctas_schema",
"id",
]
Expand Down
25 changes: 25 additions & 0 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,31 @@ def get_column_spec(
)
return None

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Select identifiers from the database engine that uniquely identifies the
queries to cancel. The identifier is typically a session id, process id
or similar.
:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Query identifier
"""
return None

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
"""
Cancel query in the underlying database.
:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: Value returned by get_cancel_query_payload or set in
other life-cycle methods of the query
:return: True if query cancelled successfully, False otherwise
"""


# schema for adding a database by providing parameters instead of the
# full SQLAlchemy URI
Expand Down
32 changes: 32 additions & 0 deletions superset/db_engine_specs/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.errors import SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType

Expand Down Expand Up @@ -220,3 +221,34 @@ def get_column_spec( # type: ignore
return super().get_column_spec(
native_type, column_type_mappings=column_type_mappings
)

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Get MySQL connection ID that will be used to cancel all other running
queries in the same connection.
:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: MySQL Connection ID
"""
cursor.execute("SELECT CONNECTION_ID()")
row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
"""
Cancel query in the underlying database.
:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: MySQL Connection ID
:return: True if query cancelled successfully, False otherwise
"""
try:
cursor.execute(f"KILL CONNECTION {cancel_query_id}")
except Exception: # pylint: disable=broad-except
return False

return True
36 changes: 36 additions & 0 deletions superset/db_engine_specs/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.errors import SupersetErrorType
from superset.exceptions import SupersetException
from superset.models.sql_lab import Query
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType

Expand Down Expand Up @@ -296,3 +297,38 @@ def get_column_spec( # type: ignore
return super().get_column_spec(
native_type, column_type_mappings=column_type_mappings
)

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Get Postgres PID that will be used to cancel all other running
queries in the same session.
:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Postgres PID
"""
cursor.execute("SELECT pg_backend_pid()")
row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
"""
Cancel query in the underlying database.
:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: Postgres PID
:return: True if query cancelled successfully, False otherwise
"""
try:
cursor.execute(
"SELECT pg_terminate_backend(pid) "
"FROM pg_stat_activity "
f"WHERE pid='{cancel_query_id}'"
)
except Exception: # pylint: disable=broad-except
return False

return True
32 changes: 32 additions & 0 deletions superset/db_engine_specs/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from superset.db_engine_specs.postgres import PostgresBaseEngineSpec
from superset.errors import SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils import core as utils

if TYPE_CHECKING:
Expand Down Expand Up @@ -128,3 +129,34 @@ def mutate_db_for_connection_test(database: "Database") -> None:
engine_params["connect_args"] = connect_args
extra["engine_params"] = engine_params
database.extra = json.dumps(extra)

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Get Snowflake session ID that will be used to cancel all other running
queries in the same session.
:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Snowflake Session ID
"""
cursor.execute("SELECT CURRENT_SESSION()")
row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
"""
Cancel query in the underlying database.
:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: Snowflake Session ID
:return: True if query cancelled successfully, False otherwise
"""
try:
cursor.execute(f"SELECT SYSTEM$CANCEL_ALL_QUERIES({cancel_query_id})")
except Exception: # pylint: disable=broad-except
return False

return True
4 changes: 4 additions & 0 deletions superset/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,7 @@ def __init__(self, error: ValidationError):
extra={"messages": error.messages},
)
super().__init__(error)


class SupersetCancelQueryException(SupersetException):
pass
51 changes: 48 additions & 3 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def dummy_sql_query_mutator(
SQL_QUERY_MUTATOR = config.get("SQL_QUERY_MUTATOR") or dummy_sql_query_mutator
log_query = config["QUERY_LOGGER"]
logger = logging.getLogger(__name__)
cancel_query_key = "cancel_query"


class SqlLabException(Exception):
Expand All @@ -83,6 +84,10 @@ class SqlLabSecurityException(SqlLabException):
pass


class SqlLabQueryStoppedException(SqlLabException):
pass


def handle_query_error(
ex: Exception,
query: Query,
Expand Down Expand Up @@ -187,7 +192,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
return handle_query_error(ex, query, session)


# pylint: disable=too-many-arguments, too-many-locals
# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
def execute_sql_statement(
sql_statement: str,
query: Query,
Expand Down Expand Up @@ -288,6 +293,12 @@ def execute_sql_statement(
)
)
except Exception as ex:
# query is stopped in another thread/worker
# stopping raises expected exceptions which we should skip
session.refresh(query)
if query.status == QueryStatus.STOPPED:
raise SqlLabQueryStoppedException()

logger.error("Query %d: %s", query.id, type(ex), exc_info=True)
logger.debug("Query %d: %s", query.id, ex)
raise SqlLabException(db_engine_spec.extract_error_message(ex))
Expand Down Expand Up @@ -438,12 +449,17 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
with closing(engine.raw_connection()) as conn:
# closing the connection closes the cursor as well
cursor = conn.cursor()
cancel_query_id = db_engine_spec.get_cancel_query_id(cursor, query)
if cancel_query_id is not None:
query.set_extra_json_key(cancel_query_key, cancel_query_id)
session.commit()
statement_count = len(statements)
for i, statement in enumerate(statements):
# Check if stopped
query = get_query(query_id, session)
session.refresh(query)
if query.status == QueryStatus.STOPPED:
return None
payload.update({"status": query.status})
return payload

# For CTAS we create the table only on the last statement
apply_ctas = query.select_as_cta and (
Expand All @@ -466,6 +482,9 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
log_params,
apply_ctas,
)
except SqlLabQueryStoppedException:
payload.update({"status": QueryStatus.STOPPED})
return payload
except Exception as ex: # pylint: disable=broad-except
msg = str(ex)
prefix_message = (
Expand Down Expand Up @@ -562,3 +581,29 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
return payload

return None


def cancel_query(query: Query, user_name: Optional[str] = None) -> bool:
"""
Cancel a running query.
:param query: Query to cancel
:param user_name: Default username
:return: True if query cancelled successfully, False otherwise
"""
cancel_query_id = query.extra.get(cancel_query_key, None)
if cancel_query_id is None:
return False

database = query.database
engine = database.get_sqla_engine(
schema=query.schema,
nullpool=True,
user_name=user_name,
source=QuerySource.SQL_LAB,
)
db_engine_spec = database.db_engine_spec

with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
return db_engine_spec.cancel_query(cursor, query, cancel_query_id)
5 changes: 5 additions & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
CertificateException,
DatabaseNotFound,
SerializationError,
SupersetCancelQueryException,
SupersetErrorException,
SupersetErrorsException,
SupersetException,
Expand Down Expand Up @@ -2335,6 +2336,10 @@ def stop_query(self) -> FlaskResponse:
str(client_id),
)
return self.json_response("OK")

if not sql_lab.cancel_query(query, g.user.username if g.user else None):
raise SupersetCancelQueryException("Could not cancel query")

query.status = QueryStatus.STOPPED
db.session.commit()

Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/databases/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def test_get_items(self):
"database_name",
"explore_database_id",
"expose_in_sqllab",
"extra",
"force_ctas_schema",
"id",
]
Expand Down
Loading

0 comments on commit 02032ee

Please sign in to comment.