Skip to content

Commit

Permalink
Working POC
Browse files Browse the repository at this point in the history
> columns are loading into page
  • Loading branch information
hughhhh authored Jun 7, 2022
1 parent 6fc435d commit 632ce21
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export default class DatasourceKey {
this.id = parseInt(idStr, 10);
this.type =
typeStr === 'table' ? DatasourceType.Table : DatasourceType.Druid;
this.type = typeStr === 'query' ? DatasourceType.Query : this.type;
}

public toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { Metric } from './Metric';
export enum DatasourceType {
Table = 'table',
Druid = 'druid',
Query = 'query',
}

/**
Expand Down
9 changes: 7 additions & 2 deletions superset/common/query_context_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ def create(

# pylint: disable=no-self-use
def _convert_to_model(self, datasource: DatasourceDict) -> BaseDatasource:
return ConnectorRegistry.get_datasource(
str(datasource["type"]), int(datasource["id"]), db.session
from superset.dao.datasource.dao import DatasourceDAO
from superset.utils.core import DatasourceType

return DatasourceDAO.get_datasource(
session=db.session,
datasource_type=DatasourceType(datasource["type"]),
datasource_id=int(datasource["id"]),
)
13 changes: 11 additions & 2 deletions superset/common/query_context_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ def get_df_payload(
and col != DTTM_ALIAS
)
]
breakpoint()
if invalid_columns:
raise QueryObjectValidationError(
_(
"Columns missing in datasource: %(invalid_columns)s",
invalid_columns=invalid_columns,
)
)

query_result = self.get_query_result(query_obj)
annotation_data = self.get_annotation_data(query_obj)
cache.set_query_result(
Expand Down Expand Up @@ -183,8 +185,15 @@ def get_query_result(self, query_object: QueryObject) -> QueryResult:
# support multiple queries from different data sources.

# The datasource here can be different backend but the interface is common
result = query_context.datasource.query(query_object.to_dict())
query = result.query + ";\n\n"
from superset.models.sql_lab import Query

query = ""
if isinstance(query_context.datasource, Query):
# todo(hugh): add logic to manage all sip68 models here
result = query_context.datasource.exc_query(query_object.to_dict())
else:
result = query_context.datasource.query(query_object.to_dict())
query = result.query + ";\n\n"

df = result.df
# Transform the timestamp we received from database to pandas supported
Expand Down
50 changes: 25 additions & 25 deletions superset/models/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,11 +638,7 @@ def cache_timeout(self):

@property
def column_names(self):
return ["ethnic_minority", "gender"]

@property
def columns(self):
return ["<col_name>"]
return [col.get('column_name') for col in self.columns]

@property
def offset(self):
Expand Down Expand Up @@ -728,6 +724,10 @@ def exc_query(self, qry: Any) -> QueryResult:
# todo(hugh): apply filters for extended query
query_str_ext = self.get_query_str_extended(qry)
sql = query_str_ext.sql

print('*****' * 5)

# sql = "select count(*) from flights"
status = QueryStatus.SUCCESS
errors = None
error_message = None
Expand Down Expand Up @@ -922,12 +922,11 @@ def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-ma
if granularity not in self.dttm_cols and granularity is not None:
granularity = self.main_dttm_col

# columns_by_name: Dict[str, sa.Table] = {
# col.column_name: col for col in self.columns
# }
# todo(hugh): fix this
columns_by_name = {}

columns_by_name = {
col.get('column_name'): col for col in self.columns
}

# todo(hugh): how are we handling metrics
# metrics_by_name: Dict[str, Column] = { # todo column vs metric?
# m.metric_name: m for m in self.metrics
Expand Down Expand Up @@ -1023,26 +1022,27 @@ def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-ma
# template_processor=template_processor,
)
# if groupby field equals a selected column
elif selected in columns_by_name:
outer = columns_by_name[selected].get_sqla_col()
else:
outer = literal_column(f"({selected})")
outer = self.make_sqla_column_compatible(outer, selected)
# elif selected in columns_by_name:
# outer = columns_by_name[selected].get_sqla_col()
# else:
# outer = literal_column(f"({selected})")
# outer = self.make_sqla_column_compatible(outer, selected)
else:
outer = self.adhoc_column_to_sqla(
col=selected, # template_processor=template_processor
)
groupby_all_columns[outer.name] = outer
if not series_column_names or outer.name in series_column_names:
groupby_series_columns[outer.name] = outer
select_exprs.append(outer)
# groupby_all_columns[outer.name] = outer
# if not series_column_names or outer.name in series_column_names:
# groupby_series_columns[outer.name] = outer
# select_exprs.append(outer)
elif columns:
for selected in columns:
select_exprs.append(
columns_by_name[selected].get_sqla_col()
if selected in columns_by_name
else self.make_sqla_column_compatible(literal_column(selected))
)
# select_exprs.append(
# columns_by_name[selected].get_sqla_col()
# if selected in columns_by_name
# else self.make_sqla_column_compatible(literal_column(selected))
# )
select_exprs.append(selected)
metrics_exprs = []

# todo(hugh): fix this
Expand Down Expand Up @@ -1093,7 +1093,7 @@ def get_sqla_query( # pylint: disable=too-many-arguments,too-many-locals,too-ma
if not db_engine_spec.allows_hidden_ordeby_agg:
select_exprs = utils.remove_duplicates(select_exprs + orderby_exprs)

qry = sa.select(select_exprs)
qry = sa.select([sa.column("YEAR")])

# todo(hugh) fix templating
# tbl, cte = self.get_from_clause(template_processor)
Expand Down
49 changes: 45 additions & 4 deletions superset/models/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""A collection of ORM sqlalchemy models for SQL Lab"""
import re
from datetime import datetime
from typing import Any, Dict, List
from typing import Any, Dict, List, Type

import simplejson as json
import sqlalchemy as sqla
Expand All @@ -42,6 +42,7 @@
from superset import security_manager
from superset.models.helpers import (
AuditMixinNullable,
ExploreMixin,
ExtraJSONMixin,
ImportExportMixin,
)
Expand All @@ -50,14 +51,16 @@
from superset.sqllab.limiting_factor import LimitingFactor
from superset.utils.core import QueryStatus, user_label

from superset.superset_typing import ResultSetColumnType

class Query(Model, ExtraJSONMixin):
class Query(Model, ExtraJSONMixin, ExploreMixin):
"""ORM model for SQL query
Now that SQL Lab support multi-statement execution, an entry in this
table may represent multiple SQL statements executed sequentially"""

__tablename__ = "query"
type = "query"
id = Column(Integer, primary_key=True)
client_id = Column(String(11), unique=True, nullable=False)

Expand Down Expand Up @@ -167,8 +170,42 @@ def sql_tables(self) -> List[Table]:
return list(ParsedQuery(self.sql).tables)

@property
def columns(self) -> List[Table]:
return self.extra.get("columns", [])
def columns(self) -> List[ResultSetColumnType]:
# todo(hughhh): move this logic into a base class
from superset.utils.core import GenericDataType
bool_types = ("BOOL",)
num_types = (
"DOUBLE",
"FLOAT",
"INT",
"BIGINT",
"NUMBER",
"LONG",
"REAL",
"NUMERIC",
"DECIMAL",
"MONEY",
)
date_types = ("DATE", "TIME")
str_types = ("VARCHAR", "STRING", "CHAR")
columns = []
for col in self.extra.get("columns", []):
computed_column = {**col}
col_type = col.get('type')

if col_type and any(map(lambda t: t in col_type.upper(), str_types)):
computed_column["type_generic"] = GenericDataType.STRING
if col_type and any(map(lambda t: t in col_type.upper(), bool_types)):
computed_column["type_generic"] = GenericDataType.BOOLEAN
if col_type and any(map(lambda t: t in col_type.upper(), num_types)):
computed_column["type_generic"] = GenericDataType.NUMERIC
if col_type and any(map(lambda t: t in col_type.upper(), date_types)):
computed_column["type_generic"] = GenericDataType.TEMPORAL

computed_column["column_name"] = col.get('name')
computed_column["groupby"] = True
columns.append(computed_column)
return columns

def raise_for_access(self) -> None:
"""
Expand All @@ -179,6 +216,10 @@ def raise_for_access(self) -> None:

security_manager.raise_for_access(query=self)

@property
def db_engine_spec(self) -> Type["BaseEngineSpec"]:
return self.database.db_engine_spec


class SavedQuery(Model, AuditMixinNullable, ExtraJSONMixin, ImportExportMixin):
"""ORM model for SQL query"""
Expand Down
38 changes: 27 additions & 11 deletions superset/utils/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1648,21 +1648,34 @@ def extract_dataframe_dtypes(
"date": GenericDataType.TEMPORAL,
}

columns_by_name = (
{column.column_name: column for column in datasource.columns}
if datasource
else {}
)
# todo(hughhhh): can we make the column_object a Union
if datasource and datasource.type == "query":
columns_by_name = {column.get('column_name'): column for column in datasource.columns}
else:
columns_by_name = (
{column.column_name: column for column in datasource.columns}
if datasource
else {}
)

generic_types: List[GenericDataType] = []
for column in df.columns:
column_object = columns_by_name.get(column)
series = df[column]
inferred_type = infer_dtype(series)
generic_type = (
GenericDataType.TEMPORAL
if column_object and column_object.is_dttm
else inferred_type_map.get(inferred_type, GenericDataType.STRING)
)
# todo(hughhhh): can we make the column_object a Union
if datasource.type == "query":
generic_type = (
GenericDataType.TEMPORAL
if column_object and column_object.get('is_dttm')
else inferred_type_map.get(inferred_type, GenericDataType.STRING)
)
else:
generic_type = (
GenericDataType.TEMPORAL
if column_object and column_object.is_dttm
else inferred_type_map.get(inferred_type, GenericDataType.STRING)
)
generic_types.append(generic_type)

return generic_types
Expand Down Expand Up @@ -1696,7 +1709,10 @@ def get_time_filter_status(
datasource: "BaseDatasource",
applied_time_extras: Dict[str, str],
) -> Tuple[List[Dict[str, str]], List[Dict[str, str]]]:
temporal_columns = {col.column_name for col in datasource.columns if col.is_dttm}

# todo(hugh): fix this
# temporal_columns = {col.column_name for col in datasource.columns if col.is_dttm}
temporal_columns = {}
applied: List[Dict[str, str]] = []
rejected: List[Dict[str, str]] = []
time_column = applied_time_extras.get(ExtraFiltersTimeColumnType.TIME_COL)
Expand Down
Loading

0 comments on commit 632ce21

Please sign in to comment.