diff --git a/superset/config.py b/superset/config.py index 8a1b5220db92d..a3fc3df00ef53 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1131,8 +1131,8 @@ def CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC( # pylint: disable=invalid-name TRACKING_URL_TRANSFORMER = lambda url: url -# Interval between consecutive polls when using Hive Engine -HIVE_POLL_INTERVAL = int(timedelta(seconds=5).total_seconds()) +# customize the polling time of each engine +DB_POLL_INTERVAL_SECONDS: Dict[str, int] = {} # Interval between consecutive polls when using Presto Engine # See here: https://github.com/dropbox/PyHive/blob/8eb0aeab8ca300f3024655419b93dad926c1a351/pyhive/presto.py#L93 # pylint: disable=line-too-long,useless-suppression diff --git a/superset/db_engine_specs/hive.py b/superset/db_engine_specs/hive.py index c69908976728b..1d27978e9d23c 100644 --- a/superset/db_engine_specs/hive.py +++ b/superset/db_engine_specs/hive.py @@ -375,7 +375,15 @@ def handle_cursor( # pylint: disable=too-many-locals last_log_line = len(log_lines) if needs_commit: session.commit() - time.sleep(current_app.config["HIVE_POLL_INTERVAL"]) + if sleep_interval := current_app.config.get("HIVE_POLL_INTERVAL"): + logger.warning( + "HIVE_POLL_INTERVAL is deprecated and will be removed in 3.0. Please use DB_POLL_INTERVAL_SECONDS instead" + ) + else: + sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get( + cls.engine, 5 + ) + time.sleep(sleep_interval) polled = cursor.poll() @classmethod diff --git a/superset/db_engine_specs/impala.py b/superset/db_engine_specs/impala.py index 048588c046fd4..177a9728fe0f8 100644 --- a/superset/db_engine_specs/impala.py +++ b/superset/db_engine_specs/impala.py @@ -14,14 +14,25 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import logging +import re +import time from datetime import datetime from typing import Any, Dict, List, Optional +from flask import current_app from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.orm import Session +from superset.constants import QUERY_EARLY_CANCEL_KEY from superset.db_engine_specs.base import BaseEngineSpec +from superset.models.sql_lab import Query from superset.utils import core as utils +logger = logging.getLogger(__name__) +# Query 5543ffdf692b7d02:f78a944000000000: 3% Complete (17 out of 547) +QUERY_PROGRESS_REGEX = re.compile(r"Query.*: (?P[0-9]+)%") + class ImpalaEngineSpec(BaseEngineSpec): """Engine spec for Cloudera's Impala""" @@ -63,3 +74,82 @@ def get_schema_names(cls, inspector: Inspector) -> List[str]: if not row[0].startswith("_") ] return schemas + + @classmethod + def has_implicit_cancel(cls) -> bool: + """ + Return True if the live cursor handles the implicit cancelation of the query, + False otherise. + + :return: Whether the live cursor implicitly cancels the query + :see: handle_cursor + """ + + return True + + @classmethod + def execute( + cls, + cursor: Any, + query: str, + **kwargs: Any, # pylint: disable=unused-argument + ) -> None: + try: + cursor.execute_async(query) + except Exception as ex: + raise cls.get_dbapi_mapped_exception(ex) + + @classmethod + def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None: + """Stop query and updates progress information""" + + query_id = query.id + unfinished_states = ( + "INITIALIZED_STATE", + "RUNNING_STATE", + ) + + try: + status = cursor.status() + while status in unfinished_states: + session.refresh(query) + query = session.query(Query).filter_by(id=query_id).one() + # if query cancelation was requested prior to the handle_cursor call, but + # the query was still executed + # modified in stop_query in views / core.py is reflected here. + # stop query + if query.extra.get(QUERY_EARLY_CANCEL_KEY): + cursor.cancel_operation() + cursor.close_operation() + cursor.close() + break + + # updates progress info by log + try: + log = cursor.get_log() or "" + except Exception: # pylint: disable=broad-except + logger.warning("Call to GetLog() failed") + log = "" + + if log: + match = QUERY_PROGRESS_REGEX.match(log) + if match: + progress = int(match.groupdict()["query_progress"]) + logger.debug( + "Query %s: Progress total: %s", str(query_id), str(progress) + ) + needs_commit = False + if progress > query.progress: + query.progress = progress + needs_commit = True + + if needs_commit: + session.commit() + sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get( + cls.engine, 5 + ) + time.sleep(sleep_interval) + status = cursor.status() + except Exception: # pylint: disable=broad-except + logger.debug("Call to status() failed ") + return diff --git a/superset/views/core.py b/superset/views/core.py index d0db5e9b2e94d..34e59947e7ddf 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -67,6 +67,7 @@ SqlMetric, TableColumn, ) +from superset.constants import QUERY_EARLY_CANCEL_KEY from superset.dashboards.commands.importers.v0 import ImportDashboardsCommand from superset.dashboards.dao import DashboardDAO from superset.dashboards.permalink.commands.get import GetDashboardPermalinkCommand @@ -2318,6 +2319,9 @@ def stop_query(self) -> FlaskResponse: raise SupersetCancelQueryException("Could not cancel query") query.status = QueryStatus.STOPPED + # Add the stop identity attribute because the sqlalchemy thread is unsafe + # because of multiple updates to the status in the query table + query.set_extra_json_key(QUERY_EARLY_CANCEL_KEY, True) query.end_time = now_as_float() db.session.commit()