diff --git a/requirements.txt b/requirements.txt index fdfde97bfd44a..e3bceee7cb9ee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,7 +22,7 @@ pandas==0.22.0 parsedatetime==2.0.0 pathlib2==2.3.0 polyline==1.3.2 -pydruid==0.4.1 +pydruid==0.4.2 pyhive==0.5.0 python-dateutil==2.6.1 python-geohash==0.8.5 diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index 1b71707bbba10..374d239346944 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -21,7 +21,8 @@ from flask_babel import lazy_gettext as _ from pydruid.client import PyDruid from pydruid.utils.aggregators import count -from pydruid.utils.filters import Bound, Dimension, Filter +from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction +from pydruid.utils.filters import Dimension, Filter from pydruid.utils.having import Aggregation from pydruid.utils.postaggregator import ( Const, Field, HyperUniqueCardinality, Postaggregator, Quantile, Quantiles, @@ -960,8 +961,25 @@ def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter): for unused, row in df.iterrows(): fields = [] for dim in dimensions: - f = Dimension(dim) == row[dim] - fields.append(f) + f = None + # Check if this dimension uses an extraction function + # If so, create the appropriate pydruid extraction object + if isinstance(dim, dict) and 'extractionFn' in dim: + (col, extraction_fn) = DruidDatasource._create_extraction_fn(dim) + dim_val = dim['outputName'] + f = Filter( + dimension=col, + value=row[dim_val], + extraction_function=extraction_fn, + ) + elif isinstance(dim, dict): + dim_val = dim['outputName'] + if dim_val: + f = Dimension(dim_val) == row[dim_val] + else: + f = Dimension(dim) == row[dim] + if f: + fields.append(f) if len(fields) > 1: term = Filter(type='and', fields=fields) new_filters.append(term) @@ -1065,7 +1083,9 @@ def _dimensions_to_values(dimensions): values = [] for dimension in dimensions: if isinstance(dimension, dict): - if 'dimension' in dimension: + if 'extractionFn' in dimension: + values.append(dimension) + elif 'dimension' in dimension: values.append(dimension['dimension']) else: values.append(dimension) @@ -1132,7 +1152,7 @@ def run_query( # noqa / druid intervals=self.intervals_from_dttms(from_dttm, to_dttm), ) - filters = DruidDatasource.get_filters(filter, self.num_cols) + filters = DruidDatasource.get_filters(filter, self.num_cols, columns_dict) if filters: qry['filter'] = filters @@ -1217,7 +1237,14 @@ def run_query( # noqa / druid pre_qry = deepcopy(qry) pre_qry_dims = self._dimensions_to_values(qry['dimensions']) - pre_qry['dimensions'] = list(set(pre_qry_dims)) + + # Can't use set on an array with dicts + # Use set with non-dict items only + non_dict_dims = list( + set([x for x in pre_qry_dims if not isinstance(x, dict)]), + ) + dict_dims = [x for x in pre_qry_dims if isinstance(x, dict)] + pre_qry['dimensions'] = non_dict_dims + dict_dims order_by = metrics[0] if metrics else pre_qry_dims[0] @@ -1341,8 +1368,31 @@ def increment_timestamp(ts): query=query_str, duration=datetime.now() - qry_start_dttm) + @staticmethod + def _create_extraction_fn(dim_spec): + extraction_fn = None + if dim_spec and 'extractionFn' in dim_spec: + col = dim_spec['dimension'] + fn = dim_spec['extractionFn'] + ext_type = fn.get('type') + if ext_type == 'lookup' and fn['lookup'].get('type') == 'map': + replace_missing_values = fn.get('replaceMissingValueWith') + retain_missing_values = fn.get('retainMissingValue', False) + injective = fn.get('isOneToOne', False) + extraction_fn = MapLookupExtraction( + fn['lookup']['map'], + replace_missing_values=replace_missing_values, + retain_missing_values=retain_missing_values, + injective=injective, + ) + elif ext_type == 'regex': + extraction_fn = RegexExtraction(fn['expr']) + else: + raise Exception(_('Unsupported extraction function: ' + ext_type)) + return (col, extraction_fn) + @classmethod - def get_filters(cls, raw_filters, num_cols): # noqa + def get_filters(cls, raw_filters, num_cols, columns_dict): # noqa """Given Superset filter data structure, returns pydruid Filter(s)""" filters = None for flt in raw_filters: @@ -1354,21 +1404,42 @@ def get_filters(cls, raw_filters, num_cols): # noqa not op or (eq is None and op not in ('IS NULL', 'IS NOT NULL'))): continue + + # Check if this dimension uses an extraction function + # If so, create the appropriate pydruid extraction object + column_def = columns_dict.get(col) + dim_spec = column_def.dimension_spec if column_def else None + extraction_fn = None + if dim_spec and 'extractionFn' in dim_spec: + (col, extraction_fn) = DruidDatasource._create_extraction_fn(dim_spec) + cond = None is_numeric_col = col in num_cols is_list_target = op in ('in', 'not in') eq = cls.filter_values_handler( eq, is_list_target=is_list_target, target_column_is_numeric=is_numeric_col) + + # For these two ops, could have used Dimension, + # but it doesn't support extraction functions if op == '==': - cond = Dimension(col) == eq + cond = Filter(dimension=col, value=eq, extraction_function=extraction_fn) elif op == '!=': - cond = Dimension(col) != eq + cond = ~Filter(dimension=col, value=eq, extraction_function=extraction_fn) elif op in ('in', 'not in'): fields = [] # ignore the filter if it has no value if not len(eq): continue + # if it uses an extraction fn, use the "in" operator + # as Dimension isn't supported + elif extraction_fn is not None: + cond = Filter( + dimension=col, + values=eq, + type='in', + extraction_function=extraction_fn, + ) elif len(eq) == 1: cond = Dimension(col) == eq[0] else: @@ -1378,20 +1449,58 @@ def get_filters(cls, raw_filters, num_cols): # noqa if op == 'not in': cond = ~cond elif op == 'regex': - cond = Filter(type='regex', pattern=eq, dimension=col) + cond = Filter( + extraction_function=extraction_fn, + type='regex', + pattern=eq, + dimension=col, + ) + + # For the ops below, could have used pydruid's Bound, + # but it doesn't support extraction functions elif op == '>=': - cond = Bound(col, eq, None, alphaNumeric=is_numeric_col) + cond = Filter( + type='bound', + extraction_function=extraction_fn, + dimension=col, + lowerStrict=False, + upperStrict=False, + lower=eq, + upper=None, + alphaNumeric=is_numeric_col, + ) elif op == '<=': - cond = Bound(col, None, eq, alphaNumeric=is_numeric_col) + cond = Filter( + type='bound', + extraction_function=extraction_fn, + dimension=col, + lowerStrict=False, + upperStrict=False, + lower=None, + upper=eq, + alphaNumeric=is_numeric_col, + ) elif op == '>': - cond = Bound( - col, eq, None, - lowerStrict=True, alphaNumeric=is_numeric_col, + cond = Filter( + type='bound', + extraction_function=extraction_fn, + lowerStrict=True, + upperStrict=False, + dimension=col, + lower=eq, + upper=None, + alphaNumeric=is_numeric_col, ) elif op == '<': - cond = Bound( - col, None, eq, - upperStrict=True, alphaNumeric=is_numeric_col, + cond = Filter( + type='bound', + extraction_function=extraction_fn, + upperStrict=True, + lowerStrict=False, + dimension=col, + lower=None, + upper=eq, + alphaNumeric=is_numeric_col, ) elif op == 'IS NULL': cond = Dimension(col) == None # NOQA diff --git a/tests/druid_func_tests.py b/tests/druid_func_tests.py index c367bd7ad7bd7..64543a8edb098 100644 --- a/tests/druid_func_tests.py +++ b/tests/druid_func_tests.py @@ -8,8 +8,10 @@ import unittest from mock import Mock +from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction import pydruid.utils.postaggregator as postaggs + import superset.connectors.druid.models as models from superset.connectors.druid.models import ( DruidColumn, DruidDatasource, DruidMetric, @@ -31,14 +33,84 @@ def emplace(metrics_dict, metric_name, is_postagg=False): # Unit tests that can be run without initializing base tests class DruidFuncTestCase(unittest.TestCase): + def test_get_filters_extraction_fn_map(self): + filters = [{'col': 'deviceName', 'val': ['iPhone X'], 'op': 'in'}] + dimension_spec = { + 'type': 'extraction', + 'dimension': 'device', + 'outputName': 'deviceName', + 'outputType': 'STRING', + 'extractionFn': { + 'type': 'lookup', + 'dimension': 'dimensionName', + 'outputName': 'dimensionOutputName', + 'replaceMissingValueWith': 'missing_value', + 'retainMissingValue': False, + 'lookup': { + 'type': 'map', + 'map': { + 'iPhone10,1': 'iPhone 8', + 'iPhone10,4': 'iPhone 8', + 'iPhone10,2': 'iPhone 8 Plus', + 'iPhone10,5': 'iPhone 8 Plus', + 'iPhone10,3': 'iPhone X', + 'iPhone10,6': 'iPhone X', + }, + 'isOneToOne': False, + }, + }, + } + spec_json = json.dumps(dimension_spec) + col = DruidColumn(column_name='deviceName', dimension_spec_json=spec_json) + column_dict = {'deviceName': col} + f = DruidDatasource.get_filters(filters, [], column_dict) + assert isinstance(f.extraction_function, MapLookupExtraction) + dim_ext_fn = dimension_spec['extractionFn'] + f_ext_fn = f.extraction_function + self.assertEqual(dim_ext_fn['lookup']['map'], f_ext_fn._mapping) + self.assertEqual(dim_ext_fn['lookup']['isOneToOne'], f_ext_fn._injective) + self.assertEqual( + dim_ext_fn['replaceMissingValueWith'], + f_ext_fn._replace_missing_values, + ) + self.assertEqual( + dim_ext_fn['retainMissingValue'], + f_ext_fn._retain_missing_values, + ) + + def test_get_filters_extraction_fn_regex(self): + filters = [{'col': 'buildPrefix', 'val': ['22B'], 'op': 'in'}] + dimension_spec = { + 'type': 'extraction', + 'dimension': 'build', + 'outputName': 'buildPrefix', + 'outputType': 'STRING', + 'extractionFn': { + 'type': 'regex', + 'expr': '(^[0-9A-Za-z]{3})', + }, + } + spec_json = json.dumps(dimension_spec) + col = DruidColumn(column_name='buildPrefix', dimension_spec_json=spec_json) + column_dict = {'buildPrefix': col} + f = DruidDatasource.get_filters(filters, [], column_dict) + assert isinstance(f.extraction_function, RegexExtraction) + dim_ext_fn = dimension_spec['extractionFn'] + f_ext_fn = f.extraction_function + self.assertEqual(dim_ext_fn['expr'], f_ext_fn._expr) + def test_get_filters_ignores_invalid_filter_objects(self): filtr = {'col': 'col1', 'op': '=='} filters = [filtr] - self.assertIsNone(DruidDatasource.get_filters(filters, [])) + col = DruidColumn(column_name='col1') + column_dict = {'col1': col} + self.assertIsNone(DruidDatasource.get_filters(filters, [], column_dict)) def test_get_filters_constructs_filter_in(self): filtr = {'col': 'A', 'op': 'in', 'val': ['a', 'b', 'c']} - res = DruidDatasource.get_filters([filtr], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertIn('filter', res.filter) self.assertIn('fields', res.filter['filter']) self.assertEqual('or', res.filter['filter']['type']) @@ -46,7 +118,9 @@ def test_get_filters_constructs_filter_in(self): def test_get_filters_constructs_filter_not_in(self): filtr = {'col': 'A', 'op': 'not in', 'val': ['a', 'b', 'c']} - res = DruidDatasource.get_filters([filtr], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertIn('filter', res.filter) self.assertIn('type', res.filter['filter']) self.assertEqual('not', res.filter['filter']['type']) @@ -58,14 +132,18 @@ def test_get_filters_constructs_filter_not_in(self): def test_get_filters_constructs_filter_equals(self): filtr = {'col': 'A', 'op': '==', 'val': 'h'} - res = DruidDatasource.get_filters([filtr], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertEqual('selector', res.filter['filter']['type']) self.assertEqual('A', res.filter['filter']['dimension']) self.assertEqual('h', res.filter['filter']['value']) def test_get_filters_constructs_filter_not_equals(self): filtr = {'col': 'A', 'op': '!=', 'val': 'h'} - res = DruidDatasource.get_filters([filtr], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertEqual('not', res.filter['filter']['type']) self.assertEqual( 'h', @@ -74,25 +152,29 @@ def test_get_filters_constructs_filter_not_equals(self): def test_get_filters_constructs_bounds_filter(self): filtr = {'col': 'A', 'op': '>=', 'val': 'h'} - res = DruidDatasource.get_filters([filtr], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertFalse(res.filter['filter']['lowerStrict']) self.assertEqual('A', res.filter['filter']['dimension']) self.assertEqual('h', res.filter['filter']['lower']) self.assertFalse(res.filter['filter']['alphaNumeric']) filtr['op'] = '>' - res = DruidDatasource.get_filters([filtr], []) + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertTrue(res.filter['filter']['lowerStrict']) filtr['op'] = '<=' - res = DruidDatasource.get_filters([filtr], []) + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertFalse(res.filter['filter']['upperStrict']) self.assertEqual('h', res.filter['filter']['upper']) filtr['op'] = '<' - res = DruidDatasource.get_filters([filtr], []) + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertTrue(res.filter['filter']['upperStrict']) def test_get_filters_constructs_regex_filter(self): filtr = {'col': 'A', 'op': 'regex', 'val': '[abc]'} - res = DruidDatasource.get_filters([filtr], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertEqual('regex', res.filter['filter']['type']) self.assertEqual('[abc]', res.filter['filter']['pattern']) self.assertEqual('A', res.filter['filter']['dimension']) @@ -100,46 +182,62 @@ def test_get_filters_constructs_regex_filter(self): def test_get_filters_composes_multiple_filters(self): filtr1 = {'col': 'A', 'op': '!=', 'val': 'y'} filtr2 = {'col': 'B', 'op': 'in', 'val': ['a', 'b', 'c']} - res = DruidDatasource.get_filters([filtr1, filtr2], []) + cola = DruidColumn(column_name='A') + colb = DruidColumn(column_name='B') + column_dict = {'A': cola, 'B': colb} + res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict) self.assertEqual('and', res.filter['filter']['type']) self.assertEqual(2, len(res.filter['filter']['fields'])) def test_get_filters_ignores_in_not_in_with_empty_value(self): filtr1 = {'col': 'A', 'op': 'in', 'val': []} filtr2 = {'col': 'A', 'op': 'not in', 'val': []} - res = DruidDatasource.get_filters([filtr1, filtr2], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict) self.assertIsNone(res) def test_get_filters_constructs_equals_for_in_not_in_single_value(self): filtr = {'col': 'A', 'op': 'in', 'val': ['a']} - res = DruidDatasource.get_filters([filtr], []) + cola = DruidColumn(column_name='A') + colb = DruidColumn(column_name='B') + column_dict = {'A': cola, 'B': colb} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertEqual('selector', res.filter['filter']['type']) def test_get_filters_handles_arrays_for_string_types(self): filtr = {'col': 'A', 'op': '==', 'val': ['a', 'b']} - res = DruidDatasource.get_filters([filtr], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertEqual('a', res.filter['filter']['value']) filtr = {'col': 'A', 'op': '==', 'val': []} - res = DruidDatasource.get_filters([filtr], []) + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertIsNone(res.filter['filter']['value']) def test_get_filters_handles_none_for_string_types(self): filtr = {'col': 'A', 'op': '==', 'val': None} - res = DruidDatasource.get_filters([filtr], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertIsNone(res) def test_get_filters_extracts_values_in_quotes(self): filtr = {'col': 'A', 'op': 'in', 'val': [' "a" ']} - res = DruidDatasource.get_filters([filtr], []) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], [], column_dict) self.assertEqual('a', res.filter['filter']['value']) def test_get_filters_converts_strings_to_num(self): filtr = {'col': 'A', 'op': 'in', 'val': ['6']} - res = DruidDatasource.get_filters([filtr], ['A']) + col = DruidColumn(column_name='A') + column_dict = {'A': col} + res = DruidDatasource.get_filters([filtr], ['A'], column_dict) self.assertEqual(6, res.filter['filter']['value']) filtr = {'col': 'A', 'op': '==', 'val': '6'} - res = DruidDatasource.get_filters([filtr], ['A']) + res = DruidDatasource.get_filters([filtr], ['A'], column_dict) self.assertEqual(6, res.filter['filter']['value']) def test_run_query_no_groupby(self):