-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Stop query in SQL Lab with impala engine #22635
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1129,8 +1129,10 @@ def CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC( # pylint: disable=invalid-name | |
TRACKING_URL_TRANSFORMER = lambda url: url | ||
|
||
|
||
# Interval between consecutive polls when using Hive Engine | ||
HIVE_POLL_INTERVAL = int(timedelta(seconds=5).total_seconds()) | ||
# customize the polling time of each engine. The default time is 5 seconds | ||
DB_POLL_INTERVAL_SECONDS = { | ||
"hive": int(timedelta(seconds=5).total_seconds()), | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't really define defaults in
In this case the default for "hive" in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, thank you for your suggestion. I can let the user set it without setting the default value. I want to change it to this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would add the type as that's the convention in DB_POLL_INTERVAL_SECONDS: Dict[str, int] = {} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed it to this way. It's true that mypy failed |
||
|
||
# Interval between consecutive polls when using Presto Engine | ||
# See here: https://github.com/dropbox/PyHive/blob/8eb0aeab8ca300f3024655419b93dad926c1a351/pyhive/presto.py#L93 # pylint: disable=line-too-long,useless-suppression | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -375,7 +375,15 @@ def handle_cursor( # pylint: disable=too-many-locals | |||||
last_log_line = len(log_lines) | ||||||
if needs_commit: | ||||||
session.commit() | ||||||
time.sleep(current_app.config["HIVE_POLL_INTERVAL"]) | ||||||
if sleep_interval := current_app.config.get("HIVE_POLL_INTERVAL"): | ||||||
logger.warning( | ||||||
"HIVE_POLL_INTERVAL is deprecated and will be removed in 3.0. Please use DB_POLL_INTERVAL instead" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The deprecation warning is incorrect:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I will check it carefully before submitting next time, and I will change it |
||||||
) | ||||||
else: | ||||||
sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get( | ||||||
cls.engine, 5 | ||||||
) | ||||||
time.sleep(sleep_interval) | ||||||
polled = cursor.poll() | ||||||
|
||||||
@classmethod | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,20 +14,31 @@ | |||||
# KIND, either express or implied. See the License for the | ||||||
# specific language governing permissions and limitations | ||||||
# under the License. | ||||||
import logging | ||||||
import re | ||||||
import time | ||||||
from datetime import datetime | ||||||
from typing import Any, Dict, List, Optional | ||||||
|
||||||
from flask import current_app | ||||||
from sqlalchemy.engine.reflection import Inspector | ||||||
from sqlalchemy.orm import Session | ||||||
|
||||||
from superset.constants import QUERY_EARLY_CANCEL_KEY | ||||||
from superset.db_engine_specs.base import BaseEngineSpec | ||||||
from superset.models.sql_lab import Query | ||||||
from superset.utils import core as utils | ||||||
|
||||||
logger = logging.getLogger(__name__) | ||||||
|
||||||
|
||||||
class ImpalaEngineSpec(BaseEngineSpec): | ||||||
"""Engine spec for Cloudera's Impala""" | ||||||
|
||||||
engine = "impala" | ||||||
engine_name = "Apache Impala" | ||||||
# Query 5543ffdf692b7d02:f78a944000000000: 3% Complete (17 out of 547) | ||||||
query_progress_r = re.compile(r".*Query.*: (?P<query_progress>[0-9]+)%.*") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: do we really need the leading and trailing
Suggested change
Also, I know this is in line with what's being done in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
|
||||||
_time_grain_expressions = { | ||||||
None: "{col}", | ||||||
|
@@ -63,3 +74,82 @@ def get_schema_names(cls, inspector: Inspector) -> List[str]: | |||||
if not row[0].startswith("_") | ||||||
] | ||||||
return schemas | ||||||
|
||||||
@classmethod | ||||||
def has_implicit_cancel(cls) -> bool: | ||||||
""" | ||||||
Return True if the live cursor handles the implicit cancelation of the query, | ||||||
False otherise. | ||||||
|
||||||
:return: Whether the live cursor implicitly cancels the query | ||||||
:see: handle_cursor | ||||||
""" | ||||||
|
||||||
return True | ||||||
|
||||||
@classmethod | ||||||
def execute( | ||||||
cls, | ||||||
cursor: Any, | ||||||
query: str, | ||||||
**kwargs: Any, # pylint: disable=unused-argument | ||||||
) -> None: | ||||||
try: | ||||||
cursor.execute_async(query) | ||||||
except Exception as ex: | ||||||
raise cls.get_dbapi_mapped_exception(ex) | ||||||
|
||||||
@classmethod | ||||||
def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None: | ||||||
"""Stop query and updates progress information""" | ||||||
|
||||||
query_id = query.id | ||||||
unfinished_states = ( | ||||||
"INITIALIZED_STATE", | ||||||
"RUNNING_STATE", | ||||||
) | ||||||
|
||||||
try: | ||||||
status = cursor.status() | ||||||
while status in unfinished_states: | ||||||
session.refresh(query) | ||||||
query = session.query(Query).filter_by(id=query_id).one() | ||||||
# if query cancelation was requested prior to the handle_cursor call, but | ||||||
# the query was still executed | ||||||
# modified in stop_query in views / core.py is reflected here. | ||||||
# stop query | ||||||
if query.extra.get(QUERY_EARLY_CANCEL_KEY): | ||||||
cursor.cancel_operation() | ||||||
cursor.close_operation() | ||||||
cursor.close() | ||||||
break | ||||||
|
||||||
# updates progress info by log | ||||||
try: | ||||||
log = cursor.get_log() or "" | ||||||
except Exception: # pylint: disable=broad-except | ||||||
logger.warning("Call to GetLog() failed") | ||||||
log = "" | ||||||
|
||||||
if log: | ||||||
match = cls.query_progress_r.match(log) | ||||||
if match: | ||||||
progress = int(match.groupdict()["query_progress"]) | ||||||
logger.debug( | ||||||
"Query %s: Progress total: %s", str(query_id), str(progress) | ||||||
) | ||||||
needs_commit = False | ||||||
if progress > query.progress: | ||||||
query.progress = progress | ||||||
needs_commit = True | ||||||
|
||||||
if needs_commit: | ||||||
session.commit() | ||||||
sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get( | ||||||
cls.engine, 5 | ||||||
) | ||||||
time.sleep(sleep_interval) | ||||||
status = cursor.status() | ||||||
except Exception: # pylint: disable=broad-except | ||||||
logger.debug("Call to status() failed ") | ||||||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is needed, as it's not changing the defaults.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can restore the previous code without changing this file