Skip to content

Commit

Permalink
feat(database): POST, PUT, DELETE API endpoints (#10741)
Browse files Browse the repository at this point in the history
* feat(database): POST, PUT, DELETE API endpoints

* post tests

* more tests

* lint

* lint

* debug ci

* fix test

* fix test

* fix test

* fix test

* fix test

* fix test

* cleanup

* handle db connection failures

* lint

* skip hive and presto for connection fail test

* fix typo
  • Loading branch information
dpgaspar authored Sep 2, 2020
1 parent b5aecaf commit 77a3167
Show file tree
Hide file tree
Showing 18 changed files with 1,676 additions and 367 deletions.
330 changes: 241 additions & 89 deletions superset/databases/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,126 +14,78 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, List, Optional
import logging
from typing import Any, Optional

from flask import g, request, Response
from flask_appbuilder.api import expose, protect, rison, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface
from marshmallow import ValidationError
from sqlalchemy.exc import NoSuchTableError, OperationalError, SQLAlchemyError

from superset import event_logger
from superset.constants import RouteMethod
from superset.databases.commands.create import CreateDatabaseCommand
from superset.databases.commands.delete import DeleteDatabaseCommand
from superset.databases.commands.exceptions import (
DatabaseCreateFailedError,
DatabaseDeleteDatasetsExistFailedError,
DatabaseDeleteFailedError,
DatabaseInvalidError,
DatabaseNotFoundError,
DatabaseUpdateFailedError,
)
from superset.databases.commands.update import UpdateDatabaseCommand
from superset.databases.decorators import check_datasource_access
from superset.databases.filters import DatabaseFilter
from superset.databases.schemas import (
database_schemas_query_schema,
DatabasePostSchema,
DatabasePutSchema,
SchemasResponseSchema,
SelectStarResponseSchema,
TableMetadataResponseSchema,
)
from superset.databases.utils import get_table_metadata
from superset.extensions import security_manager
from superset.models.core import Database
from superset.typing import FlaskResponse
from superset.utils.core import error_msg_from_exception
from superset.views.base_api import BaseSupersetModelRestApi, statsd_metrics
from superset.views.database.filters import DatabaseFilter
from superset.views.database.validators import sqlalchemy_uri_validator


def get_foreign_keys_metadata(
database: Database, table_name: str, schema_name: Optional[str]
) -> List[Dict[str, Any]]:
foreign_keys = database.get_foreign_keys(table_name, schema_name)
for fk in foreign_keys:
fk["column_names"] = fk.pop("constrained_columns")
fk["type"] = "fk"
return foreign_keys


def get_indexes_metadata(
database: Database, table_name: str, schema_name: Optional[str]
) -> List[Dict[str, Any]]:
indexes = database.get_indexes(table_name, schema_name)
for idx in indexes:
idx["type"] = "index"
return indexes


def get_col_type(col: Dict[Any, Any]) -> str:
try:
dtype = f"{col['type']}"
except Exception: # pylint: disable=broad-except
# sqla.types.JSON __str__ has a bug, so using __class__.
dtype = col["type"].__class__.__name__
return dtype


def get_table_metadata(
database: Database, table_name: str, schema_name: Optional[str]
) -> Dict[str, Any]:
"""
Get table metadata information, including type, pk, fks.
This function raises SQLAlchemyError when a schema is not found.
:param database: The database model
:param table_name: Table name
:param schema_name: schema name
:return: Dict table metadata ready for API response
"""
keys = []
columns = database.get_columns(table_name, schema_name)
primary_key = database.get_pk_constraint(table_name, schema_name)
if primary_key and primary_key.get("constrained_columns"):
primary_key["column_names"] = primary_key.pop("constrained_columns")
primary_key["type"] = "pk"
keys += [primary_key]
foreign_keys = get_foreign_keys_metadata(database, table_name, schema_name)
indexes = get_indexes_metadata(database, table_name, schema_name)
keys += foreign_keys + indexes
payload_columns: List[Dict[str, Any]] = []
for col in columns:
dtype = get_col_type(col)
payload_columns.append(
{
"name": col["name"],
"type": dtype.split("(")[0] if "(" in dtype else dtype,
"longType": dtype,
"keys": [k for k in keys if col["name"] in k["column_names"]],
}
)
return {
"name": table_name,
"columns": payload_columns,
"selectStar": database.select_star(
table_name,
schema=schema_name,
show_cols=True,
indent=True,
cols=columns,
latest_partition=True,
),
"primaryKey": primary_key,
"foreignKeys": foreign_keys,
"indexes": keys,
}
logger = logging.getLogger(__name__)


class DatabaseRestApi(BaseSupersetModelRestApi):
datamodel = SQLAInterface(Database)

include_route_methods = {
"get_list",
include_route_methods = RouteMethod.REST_MODEL_VIEW_CRUD_SET | {
"table_metadata",
"select_star",
"schemas",
}
class_permission_name = "DatabaseView"
method_permission_name = {
"get_list": "list",
"table_metadata": "list",
"select_star": "list",
"schemas": "list",
}
resource_name = "database"
allow_browser_login = True
base_filters = [["id", DatabaseFilter, lambda: []]]
show_columns = [
"id",
"database_name",
"cache_timeout",
"expose_in_sqllab",
"allow_run_async",
"allow_csv_upload",
"allow_ctas",
"allow_cvas",
"allow_dml",
"force_ctas_schema",
"allow_multi_schema_metadata_fetch",
"impersonate_user",
"encrypted_extra",
"extra",
"server_cert",
"sqlalchemy_uri",
]
list_columns = [
"id",
"database_name",
Expand All @@ -152,10 +104,30 @@ class DatabaseRestApi(BaseSupersetModelRestApi):
"backend",
"function_names",
]
add_columns = [
"database_name",
"sqlalchemy_uri",
"cache_timeout",
"expose_in_sqllab",
"allow_run_async",
"allow_csv_upload",
"allow_ctas",
"allow_cvas",
"allow_dml",
"force_ctas_schema",
"impersonate_user",
"allow_multi_schema_metadata_fetch",
"extra",
"encrypted_extra",
"server_cert",
]
edit_columns = add_columns

list_select_columns = list_columns + ["extra", "sqlalchemy_uri", "password"]
# Removes the local limit for the page size
max_page_size = -1
validators_columns = {"sqlalchemy_uri": sqlalchemy_uri_validator}
add_model_schema = DatabasePostSchema()
edit_model_schema = DatabasePutSchema()

apispec_parameter_schemas = {
"database_schemas_query_schema": database_schemas_query_schema,
Expand All @@ -167,6 +139,186 @@ class DatabaseRestApi(BaseSupersetModelRestApi):
SchemasResponseSchema,
)

@expose("/", methods=["POST"])
@protect()
@safe
@statsd_metrics
def post(self) -> Response:
"""Creates a new Database
---
post:
description: >-
Create a new Database.
requestBody:
description: Database schema
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/{{self.__class__.__name__}}.post'
responses:
201:
description: Database added
content:
application/json:
schema:
type: object
properties:
id:
type: number
result:
$ref: '#/components/schemas/{{self.__class__.__name__}}.post'
302:
description: Redirects to the current digest
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
if not request.is_json:
return self.response_400(message="Request is not JSON")
try:
item = self.add_model_schema.load(request.json)
# This validates custom Schema with custom validations
except ValidationError as error:
return self.response_400(message=error.messages)
try:
new_model = CreateDatabaseCommand(g.user, item).run()
# Return censored version for sqlalchemy URI
item["sqlalchemy_uri"] = new_model.sqlalchemy_uri
return self.response(201, id=new_model.id, result=item)
except DatabaseInvalidError as ex:
return self.response_422(message=ex.normalized_messages())
except DatabaseCreateFailedError as ex:
logger.error(
"Error creating model %s: %s", self.__class__.__name__, str(ex)
)
return self.response_422(message=str(ex))

@expose("/<int:pk>", methods=["PUT"])
@protect()
@safe
@statsd_metrics
def put( # pylint: disable=too-many-return-statements, arguments-differ
self, pk: int
) -> Response:
"""Changes a Database
---
put:
description: >-
Changes a Database.
parameters:
- in: path
schema:
type: integer
name: pk
requestBody:
description: Database schema
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/{{self.__class__.__name__}}.put'
responses:
200:
description: Database changed
content:
application/json:
schema:
type: object
properties:
id:
type: number
result:
$ref: '#/components/schemas/{{self.__class__.__name__}}.put'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
if not request.is_json:
return self.response_400(message="Request is not JSON")
try:
item = self.edit_model_schema.load(request.json)
# This validates custom Schema with custom validations
except ValidationError as error:
return self.response_400(message=error.messages)
try:
changed_model = UpdateDatabaseCommand(g.user, pk, item).run()
# Return censored version for sqlalchemy URI
item["sqlalchemy_uri"] = changed_model.sqlalchemy_uri
return self.response(200, id=changed_model.id, result=item)
except DatabaseNotFoundError:
return self.response_404()
except DatabaseInvalidError as ex:
return self.response_422(message=ex.normalized_messages())
except DatabaseUpdateFailedError as ex:
logger.error(
"Error updating model %s: %s", self.__class__.__name__, str(ex)
)
return self.response_422(message=str(ex))

@expose("/<int:pk>", methods=["DELETE"])
@protect()
@safe
@statsd_metrics
def delete(self, pk: int) -> Response: # pylint: disable=arguments-differ
"""Deletes a Database
---
delete:
description: >-
Deletes a Database.
parameters:
- in: path
schema:
type: integer
name: pk
responses:
200:
description: Database deleted
content:
application/json:
schema:
type: object
properties:
message:
type: string
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
DeleteDatabaseCommand(g.user, pk).run()
return self.response(200, message="OK")
except DatabaseNotFoundError:
return self.response_404()
except DatabaseDeleteDatasetsExistFailedError as ex:
return self.response_422(message=str(ex))
except DatabaseDeleteFailedError as ex:
logger.error(
"Error deleting model %s: %s", self.__class__.__name__, str(ex)
)
return self.response_422(message=str(ex))

@expose("/<int:pk>/schemas/")
@protect()
@safe
Expand Down
Loading

0 comments on commit 77a3167

Please sign in to comment.