Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mysql): handle string typed decimal results #24241

Merged
merged 7 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
# engine-specific type mappings to check prior to the defaults
column_type_mappings: Tuple[ColumnTypeMapping, ...] = ()

# type-specific functions to mutate values received from the database.
# Needed on certain databases that return values in an unexpected format
column_type_mutators: dict[TypeEngine, Callable[[Any], Any]] = {}

# Does database support join-free timeslot grouping
time_groupby_inline = False
limit_method = LimitMethod.FORCE_LIMIT
Expand Down Expand Up @@ -737,7 +741,27 @@ def fetch_data(
try:
if cls.limit_method == LimitMethod.FETCH_MANY and limit:
return cursor.fetchmany(limit)
return cursor.fetchall()
data = cursor.fetchall()
description = cursor.description or []
column_type_mutators = {
row[0]: func
for row in description
if (
func := cls.column_type_mutators.get(
type(cls.get_sqla_column_type(cls.get_datatype(row[1])))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment that explains the logic here? This dict comprehension might be hard to understand without the context of this PR

)
)
}
if column_type_mutators:
Copy link
Member

@zhaoyongjie zhaoyongjie May 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically decimal is a fixed point data structure but native float in Python is not fixed point data, so some drivers return a string to represent fixed point. column_type_mutators adds some extra transformation on the decimal column so the query performance might become lower than before.

From the screenshot before and after, I saw the coltypes field changed from 1 -> 0, do we have some other approach which that just change the coltypes instead to mutate entire values of column?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch about the coltypes, let me investigate why that's gone missing.

Wrt to the query performance hit, another option is to make the frontend more resilient to receiving various types. In this case we could cast the value to number in the frontend if the coltype is GenericDataType.NUMERIC. However, I'd prefer to do this type of transformation in the backend to ensure that the plugins don't have to deal with all the nuances that SQLAlchemy throws our way.

I'd be curious to hear other peoples thoughts, @michael-s-molina @john-bodley any thoughts here?

Copy link
Member Author

@villebro villebro May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It hit me, that without this change, the linked issue won't be fixed, as CSV export happens purely in the backend. With this new pattern we could even normalize non-standard timestamps early in the flow, eliminating the need for doing it in the frontend, further decoupling the quirks of the analytical databases from the downstream logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I just noticed we already have logic like this in the codebase:

# Peek at the schema to determine which column values, if any,
# require sanitization.
columns_to_sanitize: List[PlacedSanitizeFunc] = _find_columns_to_sanitize(
cursor
)

indexes = {row[0]: idx for idx, row in enumerate(description)}
for row_idx, row in enumerate(data):
new_row = list(row)
for col, func in column_type_mutators.items():
col_idx = indexes[col]
new_row[col_idx] = func(row[col_idx])
data[row_idx] = tuple(new_row)

return data
except Exception as ex:
raise cls.get_dbapi_mapped_exception(ex) from ex

Expand Down
18 changes: 11 additions & 7 deletions superset/db_engine_specs/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
# under the License.
import re
from datetime import datetime
from typing import Any, Dict, Optional, Pattern, Tuple
from decimal import Decimal
from typing import Any, Callable, Optional, Pattern
from urllib import parse

from flask_babel import gettext as __
Expand Down Expand Up @@ -123,6 +124,9 @@ class MySQLEngineSpec(BaseEngineSpec, BasicParametersMixin):
GenericDataType.STRING,
),
)
column_type_mutators: dict[types.TypeEngine, Callable[[Any], Any]] = {
DECIMAL: lambda val: Decimal(val) if isinstance(val, str) else val
}

_time_grain_expressions = {
None: "{col}",
Expand All @@ -143,9 +147,9 @@ class MySQLEngineSpec(BaseEngineSpec, BasicParametersMixin):
"INTERVAL 1 DAY)) - 1 DAY))",
}

type_code_map: Dict[int, str] = {} # loaded from get_datatype only if needed
type_code_map: dict[int, str] = {} # loaded from get_datatype only if needed

custom_errors: Dict[Pattern[str], Tuple[str, SupersetErrorType, Dict[str, Any]]] = {
custom_errors: dict[Pattern[str], tuple[str, SupersetErrorType, dict[str, Any]]] = {
CONNECTION_ACCESS_DENIED_REGEX: (
__('Either the username "%(username)s" or the password is incorrect.'),
SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
Expand Down Expand Up @@ -186,7 +190,7 @@ class MySQLEngineSpec(BaseEngineSpec, BasicParametersMixin):

@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
cls, target_type: str, dttm: datetime, db_extra: Optional[dict[str, Any]] = None
) -> Optional[str]:
sqla_type = cls.get_sqla_column_type(target_type)

Expand All @@ -201,10 +205,10 @@ def convert_dttm(
def adjust_engine_params(
cls,
uri: URL,
connect_args: Dict[str, Any],
connect_args: dict[str, Any],
catalog: Optional[str] = None,
schema: Optional[str] = None,
) -> Tuple[URL, Dict[str, Any]]:
) -> tuple[URL, dict[str, Any]]:
uri, new_connect_args = super().adjust_engine_params(
uri,
connect_args,
Expand All @@ -221,7 +225,7 @@ def adjust_engine_params(
def get_schema_from_engine_params(
cls,
sqlalchemy_uri: URL,
connect_args: Dict[str, Any],
connect_args: dict[str, Any],
) -> Optional[str]:
"""
Return the configured schema.
Expand Down
26 changes: 16 additions & 10 deletions superset/db_engine_specs/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
from __future__ import annotations

import logging
from typing import Any, Dict, Optional, Type, TYPE_CHECKING
from decimal import Decimal
from typing import Any, Callable, Optional, Type, TYPE_CHECKING

import simplejson as json
from flask import current_app
from sqlalchemy.engine.url import URL
from sqlalchemy.orm import Session
from sqlalchemy.types import DECIMAL, TypeEngine

from superset.constants import QUERY_CANCEL_KEY, QUERY_EARLY_CANCEL_KEY, USER_AGENT
from superset.databases.utils import make_url_safe
Expand All @@ -48,13 +50,17 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
engine_name = "Trino"
allows_alias_to_source_column = False

column_type_mutators: dict[TypeEngine, Callable[[Any], Any]] = {
DECIMAL: lambda val: Decimal(val) if isinstance(val, str) else val
}

@classmethod
def extra_table_metadata(
cls,
database: Database,
table_name: str,
schema_name: Optional[str],
) -> Dict[str, Any]:
) -> dict[str, Any]:
metadata = {}

if indexes := database.get_indexes(table_name, schema_name):
Expand Down Expand Up @@ -95,7 +101,7 @@ def extra_table_metadata(
@classmethod
def update_impersonation_config(
cls,
connect_args: Dict[str, Any],
connect_args: dict[str, Any],
uri: str,
username: Optional[str],
) -> None:
Expand Down Expand Up @@ -131,7 +137,7 @@ def get_url_for_impersonation(
return url

@classmethod
def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool:
def get_allow_cost_estimate(cls, extra: dict[str, Any]) -> bool:
return True

@classmethod
Expand Down Expand Up @@ -199,17 +205,17 @@ def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
return True

@staticmethod
def get_extra_params(database: Database) -> Dict[str, Any]:
def get_extra_params(database: Database) -> dict[str, Any]:
"""
Some databases require adding elements to connection parameters,
like passing certificates to `extra`. This can be done here.

:param database: database instance from which to extract extras
:raises CertificateException: If certificate is not valid/unparseable
"""
extra: Dict[str, Any] = BaseEngineSpec.get_extra_params(database)
engine_params: Dict[str, Any] = extra.setdefault("engine_params", {})
connect_args: Dict[str, Any] = engine_params.setdefault("connect_args", {})
extra: dict[str, Any] = BaseEngineSpec.get_extra_params(database)
engine_params: dict[str, Any] = extra.setdefault("engine_params", {})
connect_args: dict[str, Any] = engine_params.setdefault("connect_args", {})

connect_args.setdefault("source", USER_AGENT)

Expand All @@ -222,7 +228,7 @@ def get_extra_params(database: Database) -> Dict[str, Any]:
@staticmethod
def update_params_from_encrypted_extra(
database: Database,
params: Dict[str, Any],
params: dict[str, Any],
) -> None:
if not database.encrypted_extra:
return
Expand Down Expand Up @@ -262,7 +268,7 @@ def update_params_from_encrypted_extra(
raise ex

@classmethod
def get_dbapi_exception_mapping(cls) -> Dict[Type[Exception], Type[Exception]]:
def get_dbapi_exception_mapping(cls) -> dict[Type[Exception], Type[Exception]]:
# pylint: disable=import-outside-toplevel
from requests import exceptions as requests_exceptions

Expand Down
37 changes: 36 additions & 1 deletion tests/unit_tests/db_engine_specs/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
# under the License.

from datetime import datetime
from typing import Any, Dict, Optional, Tuple, Type
from decimal import Decimal
from typing import Any, Dict, Optional, Type
from unittest.mock import Mock, patch

import pytest
Expand Down Expand Up @@ -220,3 +221,37 @@ def test_get_schema_from_engine_params() -> None:
)
== "db1"
)


@pytest.mark.parametrize(
"data,description,expected_result",
[
(
[["1.23456", "abc"]],
[("dec", "decimal(12,6)"), ("str", "varchar(3)")],
[(Decimal("1.23456"), "abc")],
),
(
[[Decimal("1.23456"), "abc"]],
[("dec", "decimal(12,6)"), ("str", "varchar(3)")],
[(Decimal("1.23456"), "abc")],
),
(
[["1.23456", "abc"]],
[("dec", "varchar(255)"), ("str", "varchar(3)")],
[["1.23456", "abc"]],
),
],
)
def test_column_type_mutator(
data: list[tuple[Any, ...]],
description: list[Any],
expected_result: list[tuple[Any, ...]],
):
from superset.db_engine_specs.trino import TrinoEngineSpec as spec

mock_cursor = Mock()
mock_cursor.fetchall.return_value = data
mock_cursor.description = description

assert spec.fetch_data(mock_cursor) == expected_result
39 changes: 38 additions & 1 deletion tests/unit_tests/db_engine_specs/test_trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=unused-argument, import-outside-toplevel, protected-access
from __future__ import annotations

import json
from datetime import datetime
from typing import Any, Dict, Optional, Type
from decimal import Decimal
from typing import Any, Dict, Optional, Type, Union
from unittest.mock import Mock, patch

import pandas as pd
Expand Down Expand Up @@ -366,3 +369,37 @@ def test_handle_cursor_early_cancel(
assert cancel_query_mock.call_args[1]["cancel_query_id"] == query_id
else:
assert cancel_query_mock.call_args is None


@pytest.mark.parametrize(
"data,description,expected_result",
[
(
[["1.23456", "abc"]],
[("dec", "decimal(12,6)"), ("str", "varchar(3)")],
[(Decimal("1.23456"), "abc")],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice how we're changing from list[list[Any]] to list[tuple[Any, ...]] here - this is because this is what SQLA drivers should return. This will save us from having to do the conversion later here:

# only do expensive recasting if datatype is not standard list of tuples
if data and (not isinstance(data, list) or not isinstance(data[0], tuple)):
data = [tuple(row) for row in data]

),
(
[[Decimal("1.23456"), "abc"]],
[("dec", "decimal(12,6)"), ("str", "varchar(3)")],
[(Decimal("1.23456"), "abc")],
),
(
[["1.23456", "abc"]],
[("dec", "varchar(255)"), ("str", "varchar(3)")],
[["1.23456", "abc"]],
),
],
)
def test_column_type_mutator(
data: list[Union[tuple[Any, ...], list[Any]]],
description: list[Any],
expected_result: list[Union[tuple[Any, ...], list[Any]]],
):
from superset.db_engine_specs.trino import TrinoEngineSpec as spec

mock_cursor = Mock()
mock_cursor.fetchall.return_value = data
mock_cursor.description = description

assert spec.fetch_data(mock_cursor) == expected_result