Skip to content

Commit

Permalink
Adding support for query cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
alexclavel-ocient committed Jan 24, 2023
1 parent 6e773cc commit 09ad890
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions superset/db_engine_specs/ocient.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
from pyocient import _STPoint, _STLinestring, _STPolygon, TypeCodes
from superset import app
from superset.models.core import Database
from superset.models.core import Database
from typing import Any, Callable, Dict, List, NamedTuple, Tuple, Optional, Pattern

from superset.models.sql_lab import Query
# Ensure pyocient inherits Superset's logging level
superset_log_level = app.config['LOG_LEVEL']
pyocient.logger.setLevel(superset_log_level)
Expand Down Expand Up @@ -62,6 +65,11 @@
"The reference to column '(?P<column>.*?)' is not valid"
)

# Store mapping of superset Query id -> cursor object
# These are inserted into the cache when executing the query
# They are then removed, either upon cancellation or query completion
cursor_cache: Dict[Query, pyocient.Cursor]= dict()

# Custom datatype conversion functions

def _to_hex(data: bytes) -> str:
Expand Down Expand Up @@ -215,7 +223,7 @@ class OcientEngineSpec(BaseEngineSpec):

@classmethod
def get_table_names(
cls, database: "Database", inspector: Inspector, schema: Optional[str]
cls, database: Database, inspector: Inspector, schema: Optional[str]
) -> List[str]:
return sorted(inspector.get_table_names(schema))

Expand All @@ -229,7 +237,8 @@ def fetch_data(cls, cursor, lim=None):

if type(rows[0]).__name__ != 'Row':
# TODO what else is returned here?
return rows
pass


# Peek at the schema to determine which column values, if any,
# require sanitization.
Expand All @@ -243,4 +252,21 @@ def fetch_data(cls, cursor, lim=None):
v = row[info.column_index]
row[info.column_index] = info.sanitize_func(v)

# # We are done with this cursor so we can safely remove it from the cache
query_ids_to_delete = [q_id for q_id in cursor_cache if cursor_cache[q_id] == cursor]
for q_id in query_ids_to_delete:
del cursor_cache[q_id]
return rows

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:

if query.id in cursor_cache:
cursor.execute(f'CANCEL {cursor_cache[query.id].query_id}')
# Query has been cancelled, so we can safely remove the cursor from the cache
del cursor_cache[query.id]

return True
# If the query is not in the cache, it must have either been cancelled elsewhere or completed
else:
return False

0 comments on commit 09ad890

Please sign in to comment.