From 879f52298b68a453b5497dad87bfc57ecf8675de Mon Sep 17 00:00:00 2001 From: Jordan Williams Date: Thu, 19 Jan 2023 23:39:45 +0000 Subject: [PATCH] chore: code cleanup * doc: add docstrings to functions * chore: clean up comments * perf: avoid copying row data that doesn't need to be sanitized --- superset/db_engine_specs/ocient.py | 126 ++++++++++++++++++++--------- 1 file changed, 87 insertions(+), 39 deletions(-) diff --git a/superset/db_engine_specs/ocient.py b/superset/db_engine_specs/ocient.py index 473eb6b14c62e..06bb6024ef390 100644 --- a/superset/db_engine_specs/ocient.py +++ b/superset/db_engine_specs/ocient.py @@ -20,20 +20,31 @@ from superset.db_engine_specs.base import BaseEngineSpec import pyocient -from pyocient import _STPoint, _STLinestring, _STPolygon -from ipaddress import IPv4Address, IPv6Address +from pyocient import _STPoint, _STLinestring, _STPolygon, TypeCodes from superset import app +from typing import Any, Callable, Dict, List, NamedTuple, Tuple +# Ensure pyocient inherits Superset's logging level superset_log_level = app.config['LOG_LEVEL'] pyocient.logger.setLevel(superset_log_level) -# Custom datatype conversion functions - def _to_hex(data: bytes) -> str: + """ + Converts the bytes object into a string of hexadecimal digits. + + :param data: the bytes object + :returns: string of hexadecimal digits representing the bytes + """ return data.hex() def _polygon_to_json(polygon: _STPolygon) -> str: + """ + Converts the _STPolygon object into its JSON representation. + + :param data: the polygon object + :returns: JSON representation of the polygon + """ json_value = f'{str([[p.long, p.lat] for p in polygon.exterior])}' if polygon.holes: for hole in polygon.holes: @@ -42,48 +53,66 @@ def _polygon_to_json(polygon: _STPolygon) -> str: return json_value def _linestring_to_json(linestring: _STLinestring) -> str: + """ + Converts the _STLinestring object into its JSON representation. + + :param data: the linestring object + :returns: JSON representation of the linestring + """ return f'{str([[p.long, p.lat] for p in linestring.points])}' def _point_to_comma_delimited(point: _STPoint) -> str: - return f'{point.long}, {point.lat}' - + """ + Returns the x and y coordinates as a comma delimited string. -# Map of datatypes that are not allowed in superset, but are allowed in our DB -# Key is datatype, value is function to map, in order to convert to a valid superset type + :param data: the point object + :returns: the x and y coordinates as a comma delimited string + """ + return f'{point.long}, {point.lat}' -_superset_conversion_dict = { - _STPoint: _point_to_comma_delimited, - _STLinestring: _linestring_to_json, - _STPolygon: _polygon_to_json, - IPv4Address: str, - IPv6Address: str, - bytes: _to_hex +# Sanitization function for column values +SanitizeFunc = Callable[[Any], Any] + +# Represents a pair of a column index and the sanitization function +# to apply to its values. +PlacedSanitizeFunc = NamedTuple( + "PlacedSanitizeFunc", + [ + ("column_index", int), + ("sanitize_func", SanitizeFunc), + ], +) + +# This map contains functions used to sanitize values for column types +# that cannot be processed natively by Superset. +# +# Superset serializes temporal objects using a custom serializer +# defined in superset/utils/core.py (#json_int_dttm_ser(...)). Other +# are serialized by the default JSON encoder. +_sanitized_ocient_type_codes: Dict[int, SanitizeFunc] = { + TypeCodes.BINARY: _to_hex, + TypeCodes.ST_POINT: _point_to_comma_delimited, + TypeCodes.IP: str, + TypeCodes.IPV4: str, + TypeCodes.ST_LINESTRING: _linestring_to_json, + TypeCodes.ST_POLYGON: _polygon_to_json, } -def _verify_datatype(elem): - """Verifies that the type of elem is allowable in superset - Whether a datatype is allowable is determined by the builtin json.dumps method, and superset's json_int_dttm_ser found in superset/utils/core.py - NOTE: This method does not allow for inheritance, elem that inherit from an allowable type will not be seen as allowable - - If the type is allowable, it is returned without modification - If the type is not allowable, the repr string of the elem is returned - NOTE: This may need to be changed, particularity with consideration to GIS data, which may be better presented as a list - +def _find_columns_to_sanitize(cursor: Any) -> List[PlacedSanitizeFunc]: """ - if type(elem) in _superset_conversion_dict: - return _superset_conversion_dict[type(elem)](elem) - return elem + Cleans the column value for consumption by Superset. -class OcientEngineSpec(BaseEngineSpec): - """Engine spec for the Ocient Database - - Modify label to syntax supported by Ocient: - - 'count' changed to '"count"' - - label containing ')' and '(' wrapped with double-quotes - - Add an 'A' to a Label beginning with '_' - + :param cursor: the result set cursor + :returns: the list of tuples consisting of the column index and sanitization function """ + return [ + PlacedSanitizeFunc(i, _sanitized_ocient_type_codes[cursor.description[i][1]]) + for i in range(len(cursor.description)) + if cursor.description[i][1] in _sanitized_ocient_type_codes + ] + +class OcientEngineSpec(BaseEngineSpec): engine = 'ocient' engine_name = "Ocient" #limit_method = LimitMethod.WRAP_SQL @@ -110,7 +139,26 @@ def get_table_names( @classmethod def fetch_data(cls, cursor, lim=None): - data = super(OcientEngineSpec, cls).fetch_data(cursor) - if len(data) != 0 and type(data[0]).__name__ == 'Row': - data = [list(map(_verify_datatype, r)) for r in data] - return data + rows = super(OcientEngineSpec, cls).fetch_data(cursor) + + if (len(rows)) == 0: + # No rows were produced + return rows + + if type(rows[0]).__name__ != 'Row': + # TODO what else is returned here? + return rows + + # Peek at the schema to determine which column values, if any, + # require sanitization. + columns_to_sanitize: List[PlacedSanitizeFunc] = _find_columns_to_sanitize(cursor) + + if columns_to_sanitize: + # At least 1 column has to be sanitized. + for row in rows: + for info in columns_to_sanitize: + # Modify the element in-place. + v = row[info.column_index] + row[info.column_index] = info.sanitize_func(v) + + return rows