diff --git a/superset/db_engine_specs/ocient.py b/superset/db_engine_specs/ocient.py index 824ba10481131..a351390291f04 100644 --- a/superset/db_engine_specs/ocient.py +++ b/superset/db_engine_specs/ocient.py @@ -26,7 +26,10 @@ from pyocient import _STPoint, _STLinestring, _STPolygon, TypeCodes from superset import app from superset.models.core import Database +from superset.models.core import Database from typing import Any, Callable, Dict, List, NamedTuple, Tuple, Optional, Pattern + +from superset.models.sql_lab import Query # Ensure pyocient inherits Superset's logging level superset_log_level = app.config['LOG_LEVEL'] pyocient.logger.setLevel(superset_log_level) @@ -62,6 +65,11 @@ "The reference to column '(?P.*?)' is not valid" ) +# Store mapping of superset Query id -> cursor object +# These are inserted into the cache when executing the query +# They are then removed, either upon cancellation or query completion +cursor_cache: Dict[Query, pyocient.Cursor]= dict() + # Custom datatype conversion functions def _to_hex(data: bytes) -> str: @@ -215,7 +223,7 @@ class OcientEngineSpec(BaseEngineSpec): @classmethod def get_table_names( - cls, database: "Database", inspector: Inspector, schema: Optional[str] + cls, database: Database, inspector: Inspector, schema: Optional[str] ) -> List[str]: return sorted(inspector.get_table_names(schema)) @@ -229,7 +237,8 @@ def fetch_data(cls, cursor, lim=None): if type(rows[0]).__name__ != 'Row': # TODO what else is returned here? - return rows + pass + # Peek at the schema to determine which column values, if any, # require sanitization. @@ -243,4 +252,21 @@ def fetch_data(cls, cursor, lim=None): v = row[info.column_index] row[info.column_index] = info.sanitize_func(v) + # # We are done with this cursor so we can safely remove it from the cache + query_ids_to_delete = [q_id for q_id in cursor_cache if cursor_cache[q_id] == cursor] + for q_id in query_ids_to_delete: + del cursor_cache[q_id] return rows + + @classmethod + def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool: + + if query.id in cursor_cache: + cursor.execute(f'CANCEL {cursor_cache[query.id].query_id}') + # Query has been cancelled, so we can safely remove the cursor from the cache + del cursor_cache[query.id] + + return True + # If the query is not in the cache, it must have either been cancelled elsewhere or completed + else: + return False \ No newline at end of file