Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: code cleanup #4

Merged
merged 1 commit into from
Jan 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 87 additions & 39 deletions superset/db_engine_specs/ocient.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,31 @@
from superset.db_engine_specs.base import BaseEngineSpec

import pyocient
from pyocient import _STPoint, _STLinestring, _STPolygon
from ipaddress import IPv4Address, IPv6Address
from pyocient import _STPoint, _STLinestring, _STPolygon, TypeCodes
from superset import app
from typing import Any, Callable, Dict, List, NamedTuple, Tuple

# Ensure pyocient inherits Superset's logging level
superset_log_level = app.config['LOG_LEVEL']
pyocient.logger.setLevel(superset_log_level)


# Custom datatype conversion functions

def _to_hex(data: bytes) -> str:
"""
Converts the bytes object into a string of hexadecimal digits.

:param data: the bytes object
:returns: string of hexadecimal digits representing the bytes
"""
return data.hex()

def _polygon_to_json(polygon: _STPolygon) -> str:
"""
Converts the _STPolygon object into its JSON representation.

:param data: the polygon object
:returns: JSON representation of the polygon
"""
json_value = f'{str([[p.long, p.lat] for p in polygon.exterior])}'
if polygon.holes:
for hole in polygon.holes:
Expand All @@ -42,48 +53,66 @@ def _polygon_to_json(polygon: _STPolygon) -> str:
return json_value

def _linestring_to_json(linestring: _STLinestring) -> str:
"""
Converts the _STLinestring object into its JSON representation.

:param data: the linestring object
:returns: JSON representation of the linestring
"""
return f'{str([[p.long, p.lat] for p in linestring.points])}'

def _point_to_comma_delimited(point: _STPoint) -> str:
return f'{point.long}, {point.lat}'

"""
Returns the x and y coordinates as a comma delimited string.

# Map of datatypes that are not allowed in superset, but are allowed in our DB
# Key is datatype, value is function to map, in order to convert to a valid superset type
:param data: the point object
:returns: the x and y coordinates as a comma delimited string
"""
return f'{point.long}, {point.lat}'

_superset_conversion_dict = {
_STPoint: _point_to_comma_delimited,
_STLinestring: _linestring_to_json,
_STPolygon: _polygon_to_json,
IPv4Address: str,
IPv6Address: str,
bytes: _to_hex
# Sanitization function for column values
SanitizeFunc = Callable[[Any], Any]

# Represents a pair of a column index and the sanitization function
# to apply to its values.
PlacedSanitizeFunc = NamedTuple(
"PlacedSanitizeFunc",
[
("column_index", int),
("sanitize_func", SanitizeFunc),
],
)

# This map contains functions used to sanitize values for column types
# that cannot be processed natively by Superset.
#
# Superset serializes temporal objects using a custom serializer
# defined in superset/utils/core.py (#json_int_dttm_ser(...)). Other
# are serialized by the default JSON encoder.
_sanitized_ocient_type_codes: Dict[int, SanitizeFunc] = {
TypeCodes.BINARY: _to_hex,
TypeCodes.ST_POINT: _point_to_comma_delimited,
TypeCodes.IP: str,
TypeCodes.IPV4: str,
TypeCodes.ST_LINESTRING: _linestring_to_json,
TypeCodes.ST_POLYGON: _polygon_to_json,
alexclavel-ocient marked this conversation as resolved.
Show resolved Hide resolved
}

def _verify_datatype(elem):
"""Verifies that the type of elem is allowable in superset
Whether a datatype is allowable is determined by the builtin json.dumps method, and superset's json_int_dttm_ser found in superset/utils/core.py
NOTE: This method does not allow for inheritance, elem that inherit from an allowable type will not be seen as allowable

If the type is allowable, it is returned without modification
If the type is not allowable, the repr string of the elem is returned
NOTE: This may need to be changed, particularity with consideration to GIS data, which may be better presented as a list

def _find_columns_to_sanitize(cursor: Any) -> List[PlacedSanitizeFunc]:
"""
if type(elem) in _superset_conversion_dict:
return _superset_conversion_dict[type(elem)](elem)
return elem
Cleans the column value for consumption by Superset.

class OcientEngineSpec(BaseEngineSpec):
"""Engine spec for the Ocient Database

Modify label to syntax supported by Ocient:
- 'count' changed to '"count"'
- label containing ')' and '(' wrapped with double-quotes
- Add an 'A' to a Label beginning with '_'

:param cursor: the result set cursor
:returns: the list of tuples consisting of the column index and sanitization function
"""
return [
PlacedSanitizeFunc(i, _sanitized_ocient_type_codes[cursor.description[i][1]])
for i in range(len(cursor.description))
if cursor.description[i][1] in _sanitized_ocient_type_codes
]


class OcientEngineSpec(BaseEngineSpec):
engine = 'ocient'
engine_name = "Ocient"
#limit_method = LimitMethod.WRAP_SQL
Expand All @@ -110,7 +139,26 @@ def get_table_names(

@classmethod
def fetch_data(cls, cursor, lim=None):
data = super(OcientEngineSpec, cls).fetch_data(cursor)
if len(data) != 0 and type(data[0]).__name__ == 'Row':
data = [list(map(_verify_datatype, r)) for r in data]
return data
rows = super(OcientEngineSpec, cls).fetch_data(cursor)

if (len(rows)) == 0:
# No rows were produced
return rows

if type(rows[0]).__name__ != 'Row':
# TODO what else is returned here?
return rows

# Peek at the schema to determine which column values, if any,
# require sanitization.
columns_to_sanitize: List[PlacedSanitizeFunc] = _find_columns_to_sanitize(cursor)

if columns_to_sanitize:
# At least 1 column has to be sanitized.
for row in rows:
for info in columns_to_sanitize:
# Modify the element in-place.
v = row[info.column_index]
row[info.column_index] = info.sanitize_func(v)

return rows