Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] _add_filters_from_pre_query doesn't handle dim specs #3974

Merged
merged 2 commits into from
Dec 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 66 additions & 32 deletions superset/connectors/druid/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,9 @@ def values_for_column(self,
column_name,
limit=10000):
"""Retrieve some values for the given column"""
logging.info(
'Getting values for columns [{}] limited to [{}]'
.format(column_name, limit))
# TODO: Use Lexicographic TopNMetricSpec once supported by PyDruid
if self.fetch_values_from:
from_dttm = utils.parse_human_datetime(self.fetch_values_from)
Expand Down Expand Up @@ -886,6 +889,37 @@ def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter):
ret = Filter(type='and', fields=[ff, dim_filter])
return ret

def get_aggregations(self, all_metrics):
aggregations = OrderedDict()
for m in self.metrics:
if m.metric_name in all_metrics:
aggregations[m.metric_name] = m.json_obj
return aggregations

def check_restricted_metrics(self, aggregations):
rejected_metrics = [
m.metric_name for m in self.metrics
if m.is_restricted and
m.metric_name in aggregations.keys() and
not sm.has_access('metric_access', m.perm)
]
if rejected_metrics:
raise MetricPermException(
'Access to the metrics denied: ' + ', '.join(rejected_metrics),
)

def get_dimensions(self, groupby, columns_dict):
dimensions = []
groupby = [gb for gb in groupby if gb in columns_dict]
for column_name in groupby:
col = columns_dict.get(column_name)
dim_spec = col.dimension_spec if col else None
if dim_spec:
dimensions.append(dim_spec)
else:
dimensions.append(column_name)
return dimensions

def run_query( # noqa / druid
self,
groupby, metrics,
Expand Down Expand Up @@ -919,40 +953,17 @@ def run_query( # noqa / druid

query_str = ''
metrics_dict = {m.metric_name: m for m in self.metrics}

columns_dict = {c.column_name: c for c in self.columns}

all_metrics, post_aggs = self._metrics_and_post_aggs(
metrics,
metrics_dict)

aggregations = OrderedDict()
for m in self.metrics:
if m.metric_name in all_metrics:
aggregations[m.metric_name] = m.json_obj

rejected_metrics = [
m.metric_name for m in self.metrics
if m.is_restricted and
m.metric_name in aggregations.keys() and
not sm.has_access('metric_access', m.perm)
]

if rejected_metrics:
raise MetricPermException(
'Access to the metrics denied: ' + ', '.join(rejected_metrics),
)
aggregations = self.get_aggregations(all_metrics)
self.check_restricted_metrics(aggregations)

# the dimensions list with dimensionSpecs expanded
dimensions = []
groupby = [gb for gb in groupby if gb in columns_dict]
for column_name in groupby:
col = columns_dict.get(column_name)
dim_spec = col.dimension_spec
if dim_spec:
dimensions.append(dim_spec)
else:
dimensions.append(column_name)
dimensions = self.get_dimensions(groupby, columns_dict)
extras = extras or {}
qry = dict(
datasource=self.datasource_name,
Expand All @@ -974,17 +985,20 @@ def run_query( # noqa / druid
having_filters = self.get_having_filters(extras.get('having_druid'))
if having_filters:
qry['having'] = having_filters

order_direction = 'descending' if order_desc else 'ascending'

if len(groupby) == 0 and not having_filters:
logging.info('Running timeseries query for no groupby values')
del qry['dimensions']
client.timeseries(**qry)
elif (
not having_filters and
len(groupby) == 1 and
order_desc and
not isinstance(list(qry.get('dimensions'))[0], dict)
order_desc
):
dim = list(qry.get('dimensions'))[0]
logging.info('Running two-phase topn query for dimension [{}]'.format(dim))
if timeseries_limit_metric:
order_by = timeseries_limit_metric
else:
Expand All @@ -995,9 +1009,14 @@ def run_query( # noqa / druid
pre_qry['threshold'] = min(row_limit,
timeseries_limit or row_limit)
pre_qry['metric'] = order_by
pre_qry['dimension'] = dim
if isinstance(dim, dict):
if 'dimension' in dim:
pre_qry['dimension'] = dim['dimension']
else:
pre_qry['dimension'] = dim
del pre_qry['dimensions']
client.topn(**pre_qry)
logging.info('Phase 1 Complete')
query_str += '// Two phase query\n// Phase 1\n'
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
Expand All @@ -1009,19 +1028,22 @@ def run_query( # noqa / druid
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'], filters)
[pre_qry['dimension']],
filters)
qry['threshold'] = timeseries_limit or 1000
if row_limit and granularity == 'all':
qry['threshold'] = row_limit
qry['dimension'] = list(qry.get('dimensions'))[0]
qry['dimension'] = dim
del qry['dimensions']
qry['metric'] = list(qry['aggregations'].keys())[0]
client.topn(**qry)
logging.info('Phase 2 Complete')
elif len(groupby) > 0:
# If grouping on multiple fields or using a having filter
# we have to force a groupby query
logging.info('Running groupby query for dimensions [{}]'.format(dimensions))
if timeseries_limit and is_timeseries:
logging.info('Running two-phase query for timeseries')
order_by = metrics[0] if metrics else self.metrics[0]
if timeseries_limit_metric:
order_by = timeseries_limit_metric
Expand All @@ -1039,7 +1061,18 @@ def run_query( # noqa / druid
'direction': order_direction,
}],
}
pre_qry_dims = []
# Replace dimensions specs with their `dimension`
# values, and ignore those without
for dim in qry['dimensions']:
if isinstance(dim, dict):
if 'dimension' in dim:
pre_qry_dims.append(dim['dimension'])
else:
pre_qry_dims.append(dim)
pre_qry['dimensions'] = list(set(pre_qry_dims))
client.groupby(**pre_qry)
logging.info('Phase 1 Complete')
query_str += '// Two phase query\n// Phase 1\n'
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
Expand All @@ -1051,7 +1084,7 @@ def run_query( # noqa / druid
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'],
pre_qry['dimensions'],
filters,
)
qry['limit_spec'] = None
Expand All @@ -1066,6 +1099,7 @@ def run_query( # noqa / druid
}],
}
client.groupby(**qry)
logging.info('Query Complete')
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
return query_str
Expand Down
22 changes: 21 additions & 1 deletion superset/connectors/druid/views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
import json
import logging

from flask import flash, Markup, redirect
Expand Down Expand Up @@ -61,9 +62,28 @@ class DruidColumnInlineView(CompactCRUDMixin, SupersetModelView): # noqa
True),
}

def pre_update(self, col):
# If a dimension spec JSON is given, ensure that it is
# valid JSON and that `outputName` is specified
if col.dimension_spec_json:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I like the validation here in general, I feel like its a bit arbitrary that we validate that
dimension_spec_json is valid but we don't do any validation for SQL columns or SQL/Druid metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed as well. Should we starting adding validation (or 'warnings') for other things as well?

try:
dimension_spec = json.loads(col.dimension_spec_json)
except ValueError as e:
raise ValueError('Invalid Dimension Spec JSON: ' + str(e))
if not isinstance(dimension_spec, dict):
raise ValueError('Dimension Spec must be a JSON object')
if 'outputName' not in dimension_spec:
raise ValueError('Dimension Spec does not contain `outputName`')
if 'dimension' not in dimension_spec:
raise ValueError('Dimension Spec is missing `dimension`')
# `outputName` should be the same as the `column_name`
if dimension_spec['outputName'] != col.column_name:
raise ValueError(
'`outputName` [{}] unequal to `column_name` [{}]'
.format(dimension_spec['outputName'], col.column_name))

def post_update(self, col):
col.generate_metrics()
utils.validate_json(col.dimension_spec_json)

def post_add(self, col):
self.post_update(col)
Expand Down
16 changes: 9 additions & 7 deletions tests/druid_func_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def test_run_query_single_groupby(self):
self.assertIn('dimensions', client.groupby.call_args_list[0][1])
self.assertEqual(['col1'], client.groupby.call_args_list[0][1]['dimensions'])
# order_desc but timeseries and dimension spec
spec = {'spec': 1}
# calls topn with single dimension spec 'dimension'
spec = {'outputName': 'hello', 'dimension': 'matcho'}
spec_json = json.dumps(spec)
col3 = DruidColumn(column_name='col3', dimension_spec_json=spec_json)
ds.columns.append(col3)
Expand All @@ -224,13 +225,14 @@ def test_run_query_single_groupby(self):
client=client, order_desc=True, timeseries_limit=5,
filter=[], row_limit=100,
)
self.assertEqual(0, len(client.topn.call_args_list))
self.assertEqual(2, len(client.groupby.call_args_list))
self.assertEqual(2, len(client.topn.call_args_list))
self.assertEqual(0, len(client.groupby.call_args_list))
self.assertEqual(0, len(client.timeseries.call_args_list))
self.assertIn('dimensions', client.groupby.call_args_list[0][1])
self.assertIn('dimensions', client.groupby.call_args_list[1][1])
self.assertEqual([spec], client.groupby.call_args_list[0][1]['dimensions'])
self.assertEqual([spec], client.groupby.call_args_list[1][1]['dimensions'])
self.assertIn('dimension', client.topn.call_args_list[0][1])
self.assertIn('dimension', client.topn.call_args_list[1][1])
# uses dimension for pre query and full spec for final query
self.assertEqual('matcho', client.topn.call_args_list[0][1]['dimension'])
self.assertEqual(spec, client.topn.call_args_list[1][1]['dimension'])

def test_run_query_multiple_groupby(self):
client = Mock()
Expand Down