Skip to content

Commit

Permalink
[druid] adding support for dimensionspecs
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Nov 4, 2016
1 parent 757e7de commit fb5ee87
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 19 deletions.
22 changes: 22 additions & 0 deletions caravel/migrations/versions/c611f2b591b8_dim_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""dim_spec
Revision ID: c611f2b591b8
Revises: ad4d656d92bc
Create Date: 2016-11-02 17:36:04.970448
"""

# revision identifiers, used by Alembic.
revision = 'c611f2b591b8'
down_revision = 'ad4d656d92bc'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column('columns', sa.Column('dimension_spec_json', sa.Text(), nullable=True))


def downgrade():
op.drop_column('columns', 'dimension_spec_json')
20 changes: 19 additions & 1 deletion caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1884,6 +1884,8 @@ def query( # druid
all_metrics = []
post_aggs = {}

columns_dict = {c.column_name: c for c in self.columns}

def recursive_get_fields(_conf):
_fields = _conf.get('fields', [])
field_names = []
Expand Down Expand Up @@ -1931,9 +1933,19 @@ def recursive_get_fields(_conf):
"Access to the metrics denied: " + ', '.join(rejected_metrics)
)

# the dimensions list with dimensionSpecs expanded
dimensions = []
groupby = [gb for gb in groupby if gb in columns_dict]
for column_name in groupby:
col = columns_dict.get(column_name)
dim_spec = col.dimension_spec
if dim_spec:
dimensions.append(dim_spec)
else:
dimensions.append(column_name)
qry = dict(
datasource=self.datasource_name,
dimensions=groupby,
dimensions=dimensions,
aggregations=aggregations,
granularity=DruidDatasource.granularity(
granularity,
Expand Down Expand Up @@ -2242,6 +2254,7 @@ class DruidColumn(Model, AuditMixinNullable):
min = Column(Boolean, default=False)
filterable = Column(Boolean, default=False)
description = Column(Text)
dimension_spec_json = Column(Text)

def __repr__(self):
return self.column_name
Expand All @@ -2250,6 +2263,11 @@ def __repr__(self):
def isnum(self):
return self.type in ('LONG', 'DOUBLE', 'FLOAT', 'INT')

@property
def dimension_spec(self):
if self.dimension_spec_json:
return json.loads(self.dimension_spec_json)

def generate_metrics(self):
"""Generate metrics based on the column metadata"""
M = DruidMetric # noqa
Expand Down
27 changes: 21 additions & 6 deletions caravel/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,9 @@ class TableColumnInlineView(CompactCRUDMixin, CaravelModelView): # noqa
class DruidColumnInlineView(CompactCRUDMixin, CaravelModelView): # noqa
datamodel = SQLAInterface(models.DruidColumn)
edit_columns = [
'column_name', 'description', 'datasource', 'groupby',
'count_distinct', 'sum', 'min', 'max']
'column_name', 'description', 'dimension_spec_json', 'datasource',
'groupby', 'count_distinct', 'sum', 'min', 'max']
add_columns = edit_columns
list_columns = [
'column_name', 'type', 'groupby', 'filterable', 'count_distinct',
'sum', 'min', 'max']
Expand All @@ -374,9 +375,23 @@ class DruidColumnInlineView(CompactCRUDMixin, CaravelModelView): # noqa
'min': _("Min"),
'max': _("Max"),
}
description_columns = {
'dimension_spec_json': utils.markdown(
"this field can be used to specify "
"a `dimensionSpec` as documented [here]"
"(http://druid.io/docs/latest/querying/dimensionspecs.html). "
"Make sure to input valid JSON and that the "
"`outputName` matches the `column_name` defined "
"above.",
True),
}

def post_update(self, col):
col.generate_metrics()
utils.validate_json(col.dimension_spec_json)

def post_add(self, col):
self.post_update(col)

appbuilder.add_view_no_menu(DruidColumnInlineView)

Expand Down Expand Up @@ -707,11 +722,11 @@ class DruidClusterModelView(CaravelModelView, DeleteMixin): # noqa
'broker_endpoint': _("Broker Endpoint"),
}

def pre_add(self, db):
utils.merge_perm(sm, 'database_access', db.perm)
def pre_add(self, cluster):
utils.merge_perm(sm, 'database_access', cluster.perm)

def pre_update(self, db):
self.pre_add(db)
def pre_update(self, cluster):
self.pre_add(cluster)


if config['DRUID_IS_ACTIVE']:
Expand Down
1 change: 0 additions & 1 deletion caravel/viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,6 @@ class NVD3TimeSeriesViz(NVD3Viz):
def get_df(self, query_obj=None):
form_data = self.form_data
df = super(NVD3TimeSeriesViz, self).get_df(query_obj)

df = df.fillna(0)
if form_data.get("granularity") == "all":
raise Exception("Pick a time granularity for your time series")
Expand Down
2 changes: 0 additions & 2 deletions tests/core_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
import io
import random
import unittest
from datetime import datetime

from flask import escape
from flask_appbuilder.security.sqla import models as ab_models

from caravel import db, models, utils, appbuilder, sm, jinja_context
from caravel.views import DatabaseView
Expand Down
18 changes: 9 additions & 9 deletions tests/druid_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@
"version": "v1",
"timestamp": "2012-01-01T00:00:00.000Z",
"event": {
"name": 'Canada',
"sum__num": 12345678,
"dim1": 'Canada',
"metric1": 12345678,
}
},
{
"version": "v1",
"timestamp": "2012-01-01T00:00:00.000Z",
"event": {
"name": 'USA',
"sum__num": 12345678 / 2,
"dim1": 'USA',
"metric1": 12345678 / 2,
}
},
]
Expand Down Expand Up @@ -121,26 +121,26 @@ def test_client(self, PyDruid):
url = (
'/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&'
'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&'
'include_search=false&metrics=count&groupby=name&flt_col_0=dim1&'
'include_search=false&metrics=count&groupby=dim1&flt_col_0=dim1&'
'flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&'
'action=&datasource_name=test_datasource&datasource_id={}&'
'datasource_type=druid&previous_viz_type=table&'
'force=true'.format(datasource_id, datasource_id))
resp = self.get_json_resp(url)
self.assertEqual("Canada", resp['data']['records'][0]['name'])
self.assertEqual("Canada", resp['data']['records'][0]['dim1'])

# two groupby
url = (
'/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&'
'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&'
'include_search=false&metrics=count&groupby=name&'
'flt_col_0=dim1&groupby=second&'
'include_search=false&metrics=count&groupby=dim1&'
'flt_col_0=dim1&groupby=dim2d&'
'flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&'
'action=&datasource_name=test_datasource&datasource_id={}&'
'datasource_type=druid&previous_viz_type=table&'
'force=true'.format(datasource_id, datasource_id))
resp = self.get_json_resp(url)
self.assertEqual("Canada", resp['data']['records'][0]['name'])
self.assertEqual("Canada", resp['data']['records'][0]['dim1'])

def test_druid_sync_from_config(self):
CLUSTER_NAME = 'new_druid'
Expand Down

0 comments on commit fb5ee87

Please sign in to comment.