Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Ocient support #22812

Merged
merged 30 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a8b023c
feat: add OcientEngineSpec
jwilliams-ocient Jan 13, 2023
879f522
chore: code cleanup
jwilliams-ocient Jan 19, 2023
e53ef91
Merge pull request #4 from ocient/user/jwilliams/docstrings
jwilliams-ocient Jan 23, 2023
9e3ef85
Adding custom errors to Ocient engine spec
alexclavel-ocient Jan 23, 2023
818474d
Fixing syntax regex to match all ocient syntax errors
alexclavel-ocient Jan 23, 2023
e8c97ba
Cleaning up imports
alexclavel-ocient Jan 23, 2023
47a7678
Merge pull request #5 from ocient/user/aclavel/DB-22258
alexclavel-ocient Jan 23, 2023
d465588
chore: add Ocient dependencies to setup.py (#3)
jwilliams-ocient Jan 23, 2023
bee5368
fix: add missing comma to inline dict element
jwilliams-ocient Jan 24, 2023
6e773cc
Merge pull request #7 from ocient/jwilliams-ocient-patch-2
alexclavel-ocient Jan 24, 2023
ffb76bc
test: add unit tests for Ocient engine spec (#6)
jwilliams-ocient Jan 25, 2023
8059833
DB-23237 Superset: datatypes are not being converted properly (#8)
alexclavel-ocient Jan 30, 2023
6c1d956
DB-22748 Fix canceling queries in Superset (#2)
alexclavel-ocient Jan 30, 2023
6979aed
Run Black Formatting on Ocient DB Engine Spec (#9)
alexclavel-ocient Jan 30, 2023
a83a363
chore: add userdoc for Ocient DB (#10)
jwilliams-ocient Feb 6, 2023
4dd143c
fix: rename 'sqlalchemy_ocient' package to 'sqlalchemy-ocient' (#11)
jwilliams-ocient Feb 6, 2023
b075cc1
Merge pull request #13 from apache/master
jwilliams-ocient Feb 6, 2023
3ab8dff
Fix: Update Ocient docs with coorect package name (#14)
alexclavel-ocient Feb 7, 2023
75aa069
fix: apply suggestions from pre-commit hooks (#12)
jwilliams-ocient Feb 7, 2023
75e4e35
Fix: make cte_alias compatible with Ocient (#16)
alexclavel-ocient Feb 8, 2023
4d88c78
Adding Ocient to testing requirements (#18)
alexclavel-ocient Feb 28, 2023
c01ccda
Fix: make fetch_data check for rows properly (#19)
alexclavel-ocient Mar 13, 2023
1f1c5a3
build[pyocient]: set min version for pyocient (#20)
jwilliams-ocient Apr 11, 2023
30b13e5
docs: add example of setting DSN params (#21)
jwilliams-ocient Apr 12, 2023
b2ba77d
fix: recover from pyocient import errors. The library may not be inst…
jwilliams-ocient Apr 20, 2023
33891c4
fix: forgot to remove the unchecked imports at the top of the file...
jwilliams-ocient Apr 20, 2023
3f802d8
chore: remove potentially unknown type annotations
jwilliams-ocient Apr 20, 2023
d4a4d0b
chore: add epoch_to_dttm override
jwilliams-ocient Apr 20, 2023
787e303
test: fix broken unit tests (#23)
jwilliams-ocient Apr 20, 2023
e82386e
chore: pass pre-commit checks (#24)
jwilliams-ocient Apr 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/docs/databases/ocient.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
title: Ocient DB
hide_title: true
sidebar_position: 20
version: 1
---

## Ocient DB

The recommended connector library for Ocient is [sqlalchemy-ocient](https://pypi.org/project/sqlalchemy-ocient).

## Install the Ocient Driver

```
pip install sqlalchemy_ocient
```

## Connecting to Ocient

The connection string for Ocient looks like this:

```shell
ocient://{user}:{password}@{DNSname}:{port}/{database}
```

**NOTE**: You must enter the `user` and `password` credentials. `host` defaults to localhost,
port defaults to 4050, database defaults to `system` and `tls` defaults
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is tls passed to the URL?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question! We should definitely provide an example here - will fix

to `off`.

## User Access Control

Make sure the user has privileges to access and use all required databases, schemas, tables, views, and warehouses, as the Ocient SQLAlchemy engine does not test for user or role rights by default.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def get_git_sha() -> str:
"kylin": ["kylinpy>=2.8.1, <2.9"],
"mssql": ["pymssql>=2.1.4, <2.2"],
"mysql": ["mysqlclient>=2.1.0, <3"],
"ocient": ["sqlalchemy-ocient>=1.0.0"],
"oracle": ["cx-Oracle>8.0.0, <8.1"],
"pinot": ["pinotdb>=0.3.3, <0.4"],
"postgres": ["psycopg2-binary==2.9.5"],
Expand Down
304 changes: 304 additions & 0 deletions superset/db_engine_specs/ocient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import re

from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm import Session
from superset.db_engine_specs.base import BaseEngineSpec
from superset.errors import SupersetErrorType
from flask_babel import gettext as __

import pyocient
from pyocient import _STPoint, _STLinestring, _STPolygon, TypeCodes
from superset import app
from superset.models.core import Database
from typing import Any, Callable, Dict, List, NamedTuple, Tuple, Optional, Pattern
import threading

from superset.models.sql_lab import Query

# Ensure pyocient inherits Superset's logging level
superset_log_level = app.config["LOG_LEVEL"]
pyocient.logger.setLevel(superset_log_level)
alexclavel-ocient marked this conversation as resolved.
Show resolved Hide resolved


# Regular expressions to catch custom errors

CONNECTION_INVALID_USERNAME_REGEX = re.compile(
"The referenced user does not exist \(User '(?P<username>.*?)' not found\)"
)
CONNECTION_INVALID_PASSWORD_REGEX = re.compile(
"The userid/password combination was not valid \(Incorrect password for user\)"
)
CONNECTION_INVALID_HOSTNAME_REGEX = re.compile(
r"Unable to connect to (?P<host>.*?):(?P<port>.*?):"
)
CONNECTION_UNKNOWN_DATABASE_REGEX = re.compile(
"No database named '(?P<database>.*?)' exists"
)
CONNECTION_INVALID_PORT_ERROR = re.compile("Port out of range 0-65535")
INVALID_CONNECTION_STRING_REGEX = re.compile(
"An invalid connection string attribute was specified \(failed to decrypt cipher text\)"
)
SYNTAX_ERROR_REGEX = re.compile(
r"There is a syntax error in your statement \((?P<qualifier>.*?) input '(?P<input>.*?)' expecting {.*}"
)
TABLE_DOES_NOT_EXIST_REGEX = re.compile(
"The referenced table or view '(?P<table>.*?)' does not exist"
)
COLUMN_DOES_NOT_EXIST_REGEX = re.compile(
"The reference to column '(?P<column>.*?)' is not valid"
)


# Custom datatype conversion functions


def _to_hex(data: bytes) -> str:
"""
Converts the bytes object into a string of hexadecimal digits.

:param data: the bytes object
:returns: string of hexadecimal digits representing the bytes
"""
return data.hex()


def _polygon_to_json(polygon: _STPolygon) -> str:
"""
Converts the _STPolygon object into its JSON representation.

:param data: the polygon object
:returns: JSON representation of the polygon
"""
json_value = f"{str([[p.long, p.lat] for p in polygon.exterior])}"
if polygon.holes:
for hole in polygon.holes:
json_value += f", {str([[p.long, p.lat] for p in hole])}"
json_value = f"[{json_value}]"
return json_value


def _linestring_to_json(linestring: _STLinestring) -> str:
"""
Converts the _STLinestring object into its JSON representation.

:param data: the linestring object
:returns: JSON representation of the linestring
"""
return f"{str([[p.long, p.lat] for p in linestring.points])}"


def _point_to_comma_delimited(point: _STPoint) -> str:
"""
Returns the x and y coordinates as a comma delimited string.

:param data: the point object
:returns: the x and y coordinates as a comma delimited string
"""
return f"{point.long}, {point.lat}"


# Sanitization function for column values
SanitizeFunc = Callable[[Any], Any]

# Represents a pair of a column index and the sanitization function
# to apply to its values.
PlacedSanitizeFunc = NamedTuple(
"PlacedSanitizeFunc",
[
("column_index", int),
("sanitize_func", SanitizeFunc),
],
)

# This map contains functions used to sanitize values for column types
# that cannot be processed natively by Superset.
#
# Superset serializes temporal objects using a custom serializer
# defined in superset/utils/core.py (#json_int_dttm_ser(...)). Other
# are serialized by the default JSON encoder.
_sanitized_ocient_type_codes: Dict[int, SanitizeFunc] = {
TypeCodes.BINARY: _to_hex,
TypeCodes.ST_POINT: _point_to_comma_delimited,
TypeCodes.IP: str,
TypeCodes.IPV4: str,
TypeCodes.ST_LINESTRING: _linestring_to_json,
TypeCodes.ST_POLYGON: _polygon_to_json,
}


def _find_columns_to_sanitize(cursor: Any) -> List[PlacedSanitizeFunc]:
"""
Cleans the column value for consumption by Superset.

:param cursor: the result set cursor
:returns: the list of tuples consisting of the column index and sanitization function
"""
return [
PlacedSanitizeFunc(i, _sanitized_ocient_type_codes[cursor.description[i][1]])
for i in range(len(cursor.description))
if cursor.description[i][1] in _sanitized_ocient_type_codes
]


class OcientEngineSpec(BaseEngineSpec):
engine = "ocient"
engine_name = "Ocient"
# limit_method = LimitMethod.WRAP_SQL
force_column_alias_quotes = True
max_column_name_length = 30

# Store mapping of superset Query id -> Ocient ID
# These are inserted into the cache when executing the query
# They are then removed, either upon cancellation or query completion
query_id_mapping: Dict[str, str] = dict()
query_id_mapping_lock = threading.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you also test this with async queries? (with the query being executed by Celery)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @betodealmeida , I am the Ocient developer tasked with pushing this PR over the finish line, thanks for all of your help thus far!

Is there an integration test that stresses this async queries + ocient, or just a manual look-and-pray-for-racy-condition-if-exists? Pardon my ignorance, I found this doc, but couldn't find anything w.r.t. testing.


custom_errors: Dict[Pattern[str], Tuple[str, SupersetErrorType, Dict[str, Any]]] = {
CONNECTION_INVALID_USERNAME_REGEX: (
__('The username "%(username)s" does not exist.'),
SupersetErrorType.CONNECTION_INVALID_USERNAME_ERROR,
{},
),
CONNECTION_INVALID_PASSWORD_REGEX: (
__(
"The user/password combination is not valid (Incorrect password for user)."
),
SupersetErrorType.CONNECTION_INVALID_PASSWORD_ERROR,
{},
),
CONNECTION_UNKNOWN_DATABASE_REGEX: (
__('Could not connect to database: "%(database)s"'),
SupersetErrorType.CONNECTION_UNKNOWN_DATABASE_ERROR,
{},
),
CONNECTION_INVALID_HOSTNAME_REGEX: (
__('Could not resolve hostname: "%(host)s".'),
SupersetErrorType.CONNECTION_INVALID_HOSTNAME_ERROR,
{},
),
CONNECTION_INVALID_PORT_ERROR: (
__("Port out of range 0-65535"),
SupersetErrorType.CONNECTION_INVALID_PORT_ERROR,
{},
),
INVALID_CONNECTION_STRING_REGEX: (
__(
"Invalid Connection String: Expecting String of the form 'ocient://user:pass@host:port/database'."
),
SupersetErrorType.GENERIC_DB_ENGINE_ERROR,
{},
),
SYNTAX_ERROR_REGEX: (
__('Syntax Error: %(qualifier)s input "%(input)s".'),
SupersetErrorType.SYNTAX_ERROR,
{},
),
TABLE_DOES_NOT_EXIST_REGEX: (
__('Table or View "%(table)s" does not exist.'),
SupersetErrorType.TABLE_DOES_NOT_EXIST_ERROR,
{},
),
COLUMN_DOES_NOT_EXIST_REGEX: (
__('Invalid reference to column: "%(column)s"'),
SupersetErrorType.COLUMN_DOES_NOT_EXIST_ERROR,
{},
),
}
_time_grain_expressions = {
None: "{col}",
"PT1S": "ROUND({col}, 'SECOND')",
"PT1M": "ROUND({col}, 'MINUTE')",
"PT1H": "ROUND({col}, 'HOUR')",
"P1D": "ROUND({col}, 'DAY')",
"P1W": "ROUND({col}, 'WEEK')",
"P1M": "ROUND({col}, 'MONTH')",
"P0.25Y": "ROUND({col}, 'QUARTER')",
"P1Y": "ROUND({col}, 'YEAR')",
}

@classmethod
def get_table_names(
cls, database: Database, inspector: Inspector, schema: Optional[str]
) -> List[str]:
return sorted(inspector.get_table_names(schema))

@classmethod
def fetch_data(cls, cursor, lim=None):
try:
rows = super(OcientEngineSpec, cls).fetch_data(cursor)
except Exception as exception:
with OcientEngineSpec.query_id_mapping_lock:
del OcientEngineSpec.query_id_mapping[
getattr(cursor, "superset_query_id")
]
raise exception

if len(rows) > 0 and type(rows[0]).__name__ == rows:
# Peek at the schema to determine which column values, if any,
# require sanitization.
columns_to_sanitize: List[PlacedSanitizeFunc] = _find_columns_to_sanitize(
cursor
)

if columns_to_sanitize:
# At least 1 column has to be sanitized.
def do_nothing(x):
return x

sanitization_functions = [
do_nothing for _ in range(len(cursor.description))
]
for info in columns_to_sanitize:
sanitization_functions[info.column_index] = info.sanitize_func

# Rows from pyocient are given as NamedTuple, so we need to recreate the whole table
rows = [
[sanitization_functions[i](row[i]) for i in range(len(row))]
for row in rows
]
return rows

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
# Return a Non-None value
# If None is returned, Superset will not call cancel_query
return "DUMMY_VALUE"

@classmethod
def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None:
with OcientEngineSpec.query_id_mapping_lock:
OcientEngineSpec.query_id_mapping[query.id] = cursor.query_id

# Add the query id to the cursor
setattr(cursor, "superset_query_id", query.id)
return super().handle_cursor(cursor, query, session)

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
with OcientEngineSpec.query_id_mapping_lock:
if query.id in OcientEngineSpec.query_id_mapping:
cursor.execute(f"CANCEL {OcientEngineSpec.query_id_mapping[query.id]}")
# Query has been cancelled, so we can safely remove the cursor from the cache
del OcientEngineSpec.query_id_mapping[query.id]

return True
# If the query is not in the cache, it must have either been cancelled elsewhere or completed
else:
alexclavel-ocient marked this conversation as resolved.
Show resolved Hide resolved
return False
Loading