Skip to content

Commit

Permalink
chore: consolidate datasource import logic (apache#11533)
Browse files Browse the repository at this point in the history
* Consolidate dash import logic

* WIP

* Add license

* Fix lint

* Retrigger tests

* Fix lint
  • Loading branch information
betodealmeida authored and auxten committed Nov 20, 2020
1 parent eeb674a commit 807e145
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 297 deletions.
19 changes: 7 additions & 12 deletions superset/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,11 @@ def export_dashboards(dashboard_file: str, print_stdout: bool) -> None:
)
def import_datasources(path: str, sync: str, recursive: bool) -> None:
"""Import datasources from YAML"""
from superset.utils import dict_import_export
from superset.datasets.commands.importers.v0 import ImportDatasetsCommand

sync_array = sync.split(",")
path_object = Path(path)
files = []
files: List[Path] = []
if path_object.is_file():
files.append(path_object)
elif path_object.exists() and not recursive:
Expand All @@ -314,16 +314,11 @@ def import_datasources(path: str, sync: str, recursive: bool) -> None:
elif path_object.exists() and recursive:
files.extend(path_object.rglob("*.yaml"))
files.extend(path_object.rglob("*.yml"))
for file_ in files:
logger.info("Importing datasources from file %s", file_)
try:
with file_.open() as data_stream:
dict_import_export.import_from_dict(
db.session, yaml.safe_load(data_stream), sync=sync_array
)
except Exception as ex: # pylint: disable=broad-except
logger.error("Error when importing datasources from file %s", file_)
logger.error(ex)
contents = {path.name: open(path).read() for path in files}
try:
ImportDatasetsCommand(contents, sync_array).run()
except Exception: # pylint: disable=broad-except
logger.exception("Error when importing dataset")


@superset.command()
Expand Down
58 changes: 1 addition & 57 deletions superset/connectors/druid/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from superset.models.core import Database
from superset.models.helpers import AuditMixinNullable, ImportExportMixin, QueryResult
from superset.typing import FilterValues, Granularity, Metric, QueryObjectDict
from superset.utils import core as utils, import_datasource
from superset.utils import core as utils

try:
import requests
Expand Down Expand Up @@ -378,20 +378,6 @@ def refresh_metrics(self) -> None:
metric.datasource_id = self.datasource_id
db.session.add(metric)

@classmethod
def import_obj(cls, i_column: "DruidColumn") -> "DruidColumn":
def lookup_obj(lookup_column: DruidColumn) -> Optional[DruidColumn]:
return (
db.session.query(DruidColumn)
.filter(
DruidColumn.datasource_id == lookup_column.datasource_id,
DruidColumn.column_name == lookup_column.column_name,
)
.first()
)

return import_datasource.import_simple_obj(db.session, i_column, lookup_obj)


class DruidMetric(Model, BaseMetric):

Expand Down Expand Up @@ -447,20 +433,6 @@ def perm(self) -> Optional[str]:
def get_perm(self) -> Optional[str]:
return self.perm

@classmethod
def import_obj(cls, i_metric: "DruidMetric") -> "DruidMetric":
def lookup_obj(lookup_metric: DruidMetric) -> Optional[DruidMetric]:
return (
db.session.query(DruidMetric)
.filter(
DruidMetric.datasource_id == lookup_metric.datasource_id,
DruidMetric.metric_name == lookup_metric.metric_name,
)
.first()
)

return import_datasource.import_simple_obj(db.session, i_metric, lookup_obj)


druiddatasource_user = Table(
"druiddatasource_user",
Expand Down Expand Up @@ -610,34 +582,6 @@ def datasource_link(self) -> str:
def get_metric_obj(self, metric_name: str) -> Dict[str, Any]:
return [m.json_obj for m in self.metrics if m.metric_name == metric_name][0]

@classmethod
def import_obj(
cls, i_datasource: "DruidDatasource", import_time: Optional[int] = None
) -> int:
"""Imports the datasource from the object to the database.
Metrics and columns and datasource will be overridden if exists.
This function can be used to import/export dashboards between multiple
superset instances. Audit metadata isn't copies over.
"""

def lookup_datasource(d: DruidDatasource) -> Optional[DruidDatasource]:
return (
db.session.query(DruidDatasource)
.filter(
DruidDatasource.datasource_name == d.datasource_name,
DruidDatasource.cluster_id == d.cluster_id,
)
.first()
)

def lookup_cluster(d: DruidDatasource) -> Optional[DruidCluster]:
return db.session.query(DruidCluster).filter_by(id=d.cluster_id).first()

return import_datasource.import_datasource(
db.session, i_datasource, lookup_cluster, lookup_datasource, import_time
)

def latest_metadata(self) -> Optional[Dict[str, Any]]:
"""Returns segment metadata from the latest segment"""
logger.info("Syncing datasource [{}]".format(self.datasource_name))
Expand Down
87 changes: 2 additions & 85 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
)
from sqlalchemy.exc import CompileError
from sqlalchemy.orm import backref, Query, relationship, RelationshipProperty, Session
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.sql import column, ColumnElement, literal_column, table, text
from sqlalchemy.sql.expression import Label, Select, TextAsFrom
Expand All @@ -58,11 +57,7 @@
from superset.constants import NULL_STRING
from superset.db_engine_specs.base import TimestampExpression
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
DatabaseNotFound,
QueryObjectValidationError,
SupersetSecurityException,
)
from superset.exceptions import QueryObjectValidationError, SupersetSecurityException
from superset.jinja_context import (
BaseTemplateProcessor,
ExtraCache,
Expand All @@ -74,7 +69,7 @@
from superset.result_set import SupersetResultSet
from superset.sql_parse import ParsedQuery
from superset.typing import Metric, QueryObjectDict
from superset.utils import core as utils, import_datasource
from superset.utils import core as utils

config = app.config
metadata = Model.metadata # pylint: disable=no-member
Expand Down Expand Up @@ -290,20 +285,6 @@ def get_timestamp_expression(
)
return self.table.make_sqla_column_compatible(time_expr, label)

@classmethod
def import_obj(cls, i_column: "TableColumn") -> "TableColumn":
def lookup_obj(lookup_column: TableColumn) -> TableColumn:
return (
db.session.query(TableColumn)
.filter(
TableColumn.table_id == lookup_column.table_id,
TableColumn.column_name == lookup_column.column_name,
)
.first()
)

return import_datasource.import_simple_obj(db.session, i_column, lookup_obj)

def dttm_sql_literal(
self,
dttm: DateTime,
Expand Down Expand Up @@ -412,20 +393,6 @@ def perm(self) -> Optional[str]:
def get_perm(self) -> Optional[str]:
return self.perm

@classmethod
def import_obj(cls, i_metric: "SqlMetric") -> "SqlMetric":
def lookup_obj(lookup_metric: SqlMetric) -> SqlMetric:
return (
db.session.query(SqlMetric)
.filter(
SqlMetric.table_id == lookup_metric.table_id,
SqlMetric.metric_name == lookup_metric.metric_name,
)
.first()
)

return import_datasource.import_simple_obj(db.session, i_metric, lookup_obj)

def get_extra_dict(self) -> Dict[str, Any]:
try:
return json.loads(self.extra)
Expand Down Expand Up @@ -1416,56 +1383,6 @@ def fetch_metadata(self, commit: bool = True) -> MetadataResult:
db.session.commit()
return results

@classmethod
def import_obj(
cls,
i_datasource: "SqlaTable",
database_id: Optional[int] = None,
import_time: Optional[int] = None,
) -> int:
"""Imports the datasource from the object to the database.
Metrics and columns and datasource will be overrided if exists.
This function can be used to import/export dashboards between multiple
superset instances. Audit metadata isn't copies over.
"""

def lookup_sqlatable(table_: "SqlaTable") -> "SqlaTable":
return (
db.session.query(SqlaTable)
.join(Database)
.filter(
SqlaTable.table_name == table_.table_name,
SqlaTable.schema == table_.schema,
Database.id == table_.database_id,
)
.first()
)

def lookup_database(table_: SqlaTable) -> Database:
try:
return (
db.session.query(Database)
.filter_by(database_name=table_.params_dict["database_name"])
.one()
)
except NoResultFound:
raise DatabaseNotFound(
_(
"Database '%(name)s' is not found",
name=table_.params_dict["database_name"],
)
)

return import_datasource.import_datasource(
db.session,
i_datasource,
lookup_database,
lookup_sqlatable,
import_time,
database_id,
)

@classmethod
def query_datasources_by_name(
cls,
Expand Down
16 changes: 16 additions & 0 deletions superset/dashboards/commands/importers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
6 changes: 4 additions & 2 deletions superset/dashboards/commands/importers/v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import time
from copy import copy
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional

from flask_babel import lazy_gettext as _
from sqlalchemy.orm import make_transient, Session

from superset import ConnectorRegistry, db
from superset.commands.base import BaseCommand
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
from superset.datasets.commands.importers.v0 import import_dataset
from superset.exceptions import DashboardImportException
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
Expand Down Expand Up @@ -301,7 +302,7 @@ def import_dashboards(
if not data:
raise DashboardImportException(_("No data in file"))
for table in data["datasources"]:
type(table).import_obj(table, database_id, import_time=import_time)
import_dataset(table, database_id, import_time=import_time)
session.commit()
for dashboard in data["dashboards"]:
import_dashboard(dashboard, import_time=import_time)
Expand Down Expand Up @@ -333,4 +334,5 @@ def validate(self) -> None:
try:
json.loads(content)
except ValueError:
logger.exception("Invalid JSON file")
raise
16 changes: 16 additions & 0 deletions superset/datasets/commands/importers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit 807e145

Please sign in to comment.