Skip to content

Commit

Permalink
feat: Logic added to limiting factor column in Query model (apache#13521
Browse files Browse the repository at this point in the history
)

* Sqllab limit

* Add migration script

* Set default values

* initial push

* revisions

* Update superset/views/core.py

Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>

* moving migration to separate PR

* with migration

* revisions

* Fix apply_limit_to_sql

* all but tests

* added unit tests

* revisions

* Update superset/sql_lab.py

Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>

* Update superset/sql_parse.py

Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>

* fixed black issue

* Update superset/views/core.py

Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>

* updated logic

Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>
  • Loading branch information
AAfghahi and betodealmeida authored Apr 30, 2021
1 parent a69143b commit eb2ce1a
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 21 deletions.
1 change: 1 addition & 0 deletions superset-frontend/src/SqlLab/reducers/sqlLab.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ export default function sqlLabReducer(state = {}, action) {
results: action.results,
rows: action?.results?.data?.length,
state: 'success',
limitingFactor: action?.results?.query?.limitingFactor,
tempSchema: action?.results?.query?.tempSchema,
tempTable: action?.results?.query?.tempTable,
errorMessage: null,
Expand Down
1 change: 1 addition & 0 deletions superset-frontend/src/SqlLab/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ export type Query = {
templateParams: any;
rows: number;
queryLimit: number;
limitingFactor: string;
};
6 changes: 4 additions & 2 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,9 @@ def extra_table_metadata(
return {}

@classmethod
def apply_limit_to_sql(cls, sql: str, limit: int, database: "Database") -> str:
def apply_limit_to_sql(
cls, sql: str, limit: int, database: "Database", force: bool = False
) -> str:
"""
Alters the SQL statement to apply a LIMIT clause
Expand All @@ -590,7 +592,7 @@ def apply_limit_to_sql(cls, sql: str, limit: int, database: "Database") -> str:

if cls.limit_method == LimitMethod.FORCE_LIMIT:
parsed_query = sql_parse.ParsedQuery(sql)
sql = parsed_query.set_or_update_query_limit(limit)
sql = parsed_query.set_or_update_query_limit(limit, force=force)

return sql

Expand Down
6 changes: 4 additions & 2 deletions superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,10 @@ def select_star( # pylint: disable=too-many-arguments
cols=cols,
)

def apply_limit_to_sql(self, sql: str, limit: int = 1000) -> str:
return self.db_engine_spec.apply_limit_to_sql(sql, limit, self)
def apply_limit_to_sql(
self, sql: str, limit: int = 1000, force: bool = False
) -> str:
return self.db_engine_spec.apply_limit_to_sql(sql, limit, self, force=force)

def safe_sqlalchemy_uri(self) -> str:
return self.sqlalchemy_uri
Expand Down
14 changes: 14 additions & 0 deletions superset/models/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
"""A collection of ORM sqlalchemy models for SQL Lab"""
import enum
import re
from datetime import datetime
from typing import Any, Dict, List
Expand All @@ -29,6 +30,7 @@
Boolean,
Column,
DateTime,
Enum,
ForeignKey,
Integer,
Numeric,
Expand All @@ -49,6 +51,14 @@
from superset.utils.core import QueryStatus, user_label


class LimitingFactor(str, enum.Enum):
QUERY = "QUERY"
DROPDOWN = "DROPDOWN"
QUERY_AND_DROPDOWN = "QUERY_AND_DROPDOWN"
NOT_LIMITED = "NOT_LIMITED"
UNKNOWN = "UNKNOWN"


class Query(Model, ExtraJSONMixin):
"""ORM model for SQL query
Expand Down Expand Up @@ -76,6 +86,9 @@ class Query(Model, ExtraJSONMixin):
executed_sql = Column(Text)
# Could be configured in the superset config.
limit = Column(Integer)
limiting_factor = Column(
Enum(LimitingFactor), server_default=LimitingFactor.UNKNOWN
)
select_as_cta = Column(Boolean)
select_as_cta_used = Column(Boolean, default=False)
ctas_method = Column(String(16), default=CtasMethod.TABLE)
Expand Down Expand Up @@ -120,6 +133,7 @@ def to_dict(self) -> Dict[str, Any]:
"id": self.client_id,
"queryId": self.id,
"limit": self.limit,
"limitingFactor": self.limiting_factor,
"progress": self.progress,
"rows": self.rows,
"schema": self.schema,
Expand Down
22 changes: 17 additions & 5 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from superset.db_engine_specs import BaseEngineSpec
from superset.extensions import celery_app
from superset.models.core import Database
from superset.models.sql_lab import Query
from superset.models.sql_lab import LimitingFactor, Query
from superset.result_set import SupersetResultSet
from superset.sql_parse import CtasMethod, ParsedQuery
from superset.utils.celery import session_scope
Expand Down Expand Up @@ -179,7 +179,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
return handle_query_error(str(ex), query, session)


# pylint: disable=too-many-arguments
# pylint: disable=too-many-arguments, too-many-locals
def execute_sql_statement(
sql_statement: str,
query: Query,
Expand All @@ -194,6 +194,10 @@ def execute_sql_statement(
db_engine_spec = database.db_engine_spec
parsed_query = ParsedQuery(sql_statement)
sql = parsed_query.stripped()
# This is a test to see if the query is being
# limited by either the dropdown or the sql.
# We are testing to see if more rows exist than the limit.
increased_limit = None if query.limit is None else query.limit + 1

if not db_engine_spec.is_readonly_query(parsed_query) and not database.allow_dml:
raise SqlLabSecurityException(
Expand All @@ -219,11 +223,14 @@ def execute_sql_statement(
if SQL_MAX_ROW and (not query.limit or query.limit > SQL_MAX_ROW):
query.limit = SQL_MAX_ROW
if query.limit:
sql = database.apply_limit_to_sql(sql, query.limit)
# We are fetching one more than the requested limit in order
# to test whether there are more rows than the limit.
# Later, the extra row will be dropped before sending
# the results back to the user.
sql = database.apply_limit_to_sql(sql, increased_limit, force=True)

# Hook to allow environment-specific mutation (usually comments) to the SQL
sql = SQL_QUERY_MUTATOR(sql, user_name, security_manager, database)

try:
if log_query:
log_query(
Expand All @@ -249,7 +256,12 @@ def execute_sql_statement(
query.id,
str(query.to_dict()),
)
data = db_engine_spec.fetch_data(cursor, query.limit)
data = db_engine_spec.fetch_data(cursor, increased_limit)
if query.limit is None or len(data) <= query.limit:
query.limiting_factor = LimitingFactor.NOT_LIMITED
else:
# return 1 row less than increased_query
data = data[:-1]
except Exception as ex:
logger.error("Query %d: %s", query.id, type(ex), exc_info=True)
logger.debug("Query %d: %s", query.id, ex)
Expand Down
6 changes: 3 additions & 3 deletions superset/sql_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def _extract_from_token( # pylint: disable=too-many-branches
if any(not self._is_identifier(token2) for token2 in item.tokens):
self._extract_from_token(item)

def set_or_update_query_limit(self, new_limit: int) -> str:
def set_or_update_query_limit(self, new_limit: int, force: bool = False) -> str:
"""Returns the query with the specified limit.
Does not change the underlying query if user did not apply the limit,
Expand All @@ -332,8 +332,8 @@ def set_or_update_query_limit(self, new_limit: int) -> str:
break
_, limit = statement.token_next(idx=limit_pos)
# Override the limit only when it exceeds the configured value.
if limit.ttype == sqlparse.tokens.Literal.Number.Integer and new_limit < int(
limit.value
if limit.ttype == sqlparse.tokens.Literal.Number.Integer and (
force or new_limit < int(limit.value)
):
limit.value = new_limit
elif limit.is_group:
Expand Down
30 changes: 26 additions & 4 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
from superset.models.dashboard import Dashboard
from superset.models.datasource_access_request import DatasourceAccessRequest
from superset.models.slice import Slice
from superset.models.sql_lab import Query, TabState
from superset.models.sql_lab import LimitingFactor, Query, TabState
from superset.models.user_attributes import UserAttribute
from superset.queries.dao import QueryDAO
from superset.security.analytics_db_safety import check_sqlalchemy_uri
Expand Down Expand Up @@ -2393,6 +2393,7 @@ def _sql_json_sync(
# Update saved query if needed
QueryDAO.update_saved_query_exec_info(query_id)

# TODO: set LimitingFactor to display?
payload = json.dumps(
apply_display_max_row_limit(data),
default=utils.pessimistic_json_iso_dttm_ser,
Expand Down Expand Up @@ -2548,6 +2549,12 @@ def sql_json_exec( # pylint: disable=too-many-statements,too-many-locals
if not (config.get("SQLLAB_CTAS_NO_LIMIT") and select_as_cta):
# set LIMIT after template processing
limits = [mydb.db_engine_spec.get_limit_from_sql(rendered_query), limit]
if limits[0] is None or limits[0] > limits[1]:
query.limiting_factor = LimitingFactor.DROPDOWN
elif limits[1] > limits[0]:
query.limiting_factor = LimitingFactor.QUERY
else: # limits[0] == limits[1]
query.limiting_factor = LimitingFactor.QUERY_AND_DROPDOWN
query.limit = min(lim for lim in limits if lim is not None)

# Flag for whether or not to expand data
Expand All @@ -2571,7 +2578,9 @@ def sql_json_exec( # pylint: disable=too-many-statements,too-many-locals
@has_access
@event_logger.log_this
@expose("/csv/<client_id>")
def csv(self, client_id: str) -> FlaskResponse: # pylint: disable=no-self-use
def csv( # pylint: disable=no-self-use,too-many-locals
self, client_id: str
) -> FlaskResponse:
"""Download the query results as csv."""
logger.info("Exporting CSV file [%s]", client_id)
query = db.session.query(Query).filter_by(client_id=client_id).one()
Expand Down Expand Up @@ -2599,8 +2608,21 @@ def csv(self, client_id: str) -> FlaskResponse: # pylint: disable=no-self-use
logger.info("Using pandas to convert to CSV")
else:
logger.info("Running a query to turn into CSV")
sql = query.select_sql or query.executed_sql
df = query.database.get_df(sql, query.schema)
if query.select_sql:
sql = query.select_sql
limit = None
else:
sql = query.executed_sql
limit = ParsedQuery(sql).limit
if limit is not None and query.limiting_factor in {
LimitingFactor.QUERY,
LimitingFactor.DROPDOWN,
LimitingFactor.QUERY_AND_DROPDOWN,
}:
# remove extra row from `increased_limit`
limit -= 1
df = query.database.get_df(sql, query.schema)[:limit]

csv_data = csv.df_to_escaped_csv(df, index=False, **config["CSV_EXPORT"])
quoted_csv_name = parse.quote(query.name)
response = CsvResponse(
Expand Down
13 changes: 13 additions & 0 deletions tests/db_engine_specs/base_engine_spec_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ def test_limit_query_with_limit_subquery(self): # pylint: disable=invalid-name
"SELECT * FROM (SELECT * FROM a LIMIT 10) LIMIT 1000",
)

def test_limit_query_without_force(self):
self.sql_limit_regex(
"SELECT * FROM a LIMIT 10", "SELECT * FROM a LIMIT 10", limit=11,
)

def test_limit_query_with_force(self):
self.sql_limit_regex(
"SELECT * FROM a LIMIT 10",
"SELECT * FROM a LIMIT 11",
limit=11,
force=True,
)

def test_limit_with_expr(self):
self.sql_limit_regex(
"""
Expand Down
9 changes: 7 additions & 2 deletions tests/db_engine_specs/base_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@

class TestDbEngineSpec(SupersetTestCase):
def sql_limit_regex(
self, sql, expected_sql, engine_spec_class=MySQLEngineSpec, limit=1000
self,
sql,
expected_sql,
engine_spec_class=MySQLEngineSpec,
limit=1000,
force=False,
):
main = Database(database_name="test_database", sqlalchemy_uri="sqlite://")
limited = engine_spec_class.apply_limit_to_sql(sql, limit, main)
limited = engine_spec_class.apply_limit_to_sql(sql, limit, main, force)
self.assertEqual(expected_sql, limited)
30 changes: 28 additions & 2 deletions tests/sqllab_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
from superset.db_engine_specs import BaseEngineSpec
from superset.errors import ErrorLevel, SupersetErrorType
from superset.models.core import Database
from superset.models.sql_lab import Query, SavedQuery
from superset.models.sql_lab import LimitingFactor, Query, SavedQuery
from superset.result_set import SupersetResultSet
from superset.sql_lab import (
execute_sql_statements,
execute_sql_statement,
get_sql_results,
SqlLabException,
SqlLabTimeoutException,
Expand Down Expand Up @@ -119,7 +120,6 @@ def test_sql_json_to_saved_query_info(self):
)
assert saved_query_.rows is not None
assert saved_query_.last_run == datetime.now()

# Rollback changes
db.session.delete(saved_query_)
db.session.commit()
Expand Down Expand Up @@ -507,18 +507,44 @@ def test_sql_limit(self):
"SELECT * FROM birth_names", client_id="sql_limit_2", query_limit=test_limit
)
self.assertEqual(len(data["data"]), test_limit)

data = self.run_sql(
"SELECT * FROM birth_names LIMIT {}".format(test_limit),
client_id="sql_limit_3",
query_limit=test_limit + 1,
)
self.assertEqual(len(data["data"]), test_limit)
self.assertEqual(data["query"]["limitingFactor"], LimitingFactor.QUERY)

data = self.run_sql(
"SELECT * FROM birth_names LIMIT {}".format(test_limit + 1),
client_id="sql_limit_4",
query_limit=test_limit,
)
self.assertEqual(len(data["data"]), test_limit)
self.assertEqual(data["query"]["limitingFactor"], LimitingFactor.DROPDOWN)

data = self.run_sql(
"SELECT * FROM birth_names LIMIT {}".format(test_limit),
client_id="sql_limit_5",
query_limit=test_limit,
)
self.assertEqual(len(data["data"]), test_limit)
self.assertEqual(
data["query"]["limitingFactor"], LimitingFactor.QUERY_AND_DROPDOWN
)

data = self.run_sql(
"SELECT * FROM birth_names", client_id="sql_limit_6", query_limit=10000,
)
self.assertEqual(len(data["data"]), 1200)
self.assertEqual(data["query"]["limitingFactor"], LimitingFactor.NOT_LIMITED)

data = self.run_sql(
"SELECT * FROM birth_names", client_id="sql_limit_7", query_limit=1200,
)
self.assertEqual(len(data["data"]), 1200)
self.assertEqual(data["query"]["limitingFactor"], LimitingFactor.NOT_LIMITED)

def test_query_api_filter(self) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/superset_test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
PRESTO_POLL_INTERVAL = 0.1
HIVE_POLL_INTERVAL = 0.1

SQL_MAX_ROW = 666
SQL_MAX_ROW = 10000
SQLLAB_CTAS_NO_LIMIT = True # SQL_MAX_ROW will not take affect for the CTA queries
FEATURE_FLAGS = {
**FEATURE_FLAGS,
Expand Down

0 comments on commit eb2ce1a

Please sign in to comment.