From 374802e437720ebe4a230115e3b4d7c61f2fff2c Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 30 Jul 2015 06:39:30 +0000 Subject: [PATCH] Now enabling multi-cluster, connection info managed in UI --- app/__init__.py | 2 ++ app/models.py | 71 +++++++++++++++++++++++++++++++++---------------- app/utils.py | 8 ++---- app/views.py | 51 ++++++++++++++++++++++------------- app/viz.py | 6 ++--- config.py | 8 ------ 6 files changed, 88 insertions(+), 58 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 9bbdd8ea963fb..97c83055919c3 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -20,4 +20,6 @@ class MyIndexView(IndexView): app, db.session, base_template='panoramix/base.html', indexview=MyIndexView) +get_session = appbuilder.get_session + from app import views diff --git a/app/models.py b/app/models.py index b3e7bb646e1e2..f0591881b69a2 100644 --- a/app/models.py +++ b/app/models.py @@ -1,15 +1,15 @@ from flask.ext.appbuilder import Model -from datetime import datetime, timedelta -from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn, ImageColumn -from flask.ext.appbuilder.security.sqla.models import User +from pydruid import client +from datetime import timedelta +from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime from sqlalchemy.orm import relationship -from app import db, utils +from app import get_session from dateutil.parser import parse -import json - -client = utils.get_pydruid_client() +import logging +import json +import requests class Cluster(Model, AuditMixin): @@ -24,6 +24,27 @@ class Cluster(Model, AuditMixin): broker_endpoint = Column(String(256)) metadata_last_refreshed = Column(DateTime) + def __repr__(self): + return self.cluster_name + + def get_pydruid_client(self): + cli = client.PyDruid( + "http://{0}:{1}/".format(self.broker_host, self.broker_port), + self.broker_endpoint) + return cli + + def refresh_datasources(self): + endpoint = ( + "http://{self.coordinator_host}:{self.coordinator_port}/" + "{self.coordinator_endpoint}/datasources" + ).format(self=self) + datasources = json.loads(requests.get(endpoint).text) + for datasource in datasources: + #try: + Datasource.sync_to_db(datasource, self) + #except Exception as e: + # logging.exception(e) + # logging.error("Failed at syncing " + datasource) class Datasource(Model, AuditMixin): __tablename__ = 'datasources' @@ -60,15 +81,15 @@ def get_metric_obj(self, metric_name): if m.metric_name == metric_name ][0] - @classmethod - def latest_metadata(cls, name): - results = client.time_boundary(datasource=name) + def latest_metadata(self): + client = self.cluster.get_pydruid_client() + results = client.time_boundary(datasource=self.datasource_name) max_time = results[0]['result']['minTime'] max_time = parse(max_time) intervals = (max_time - timedelta(seconds=1)).isoformat() + '/' intervals += (max_time + timedelta(seconds=1)).isoformat() segment_metadata = client.segment_metadata( - datasource=name, + datasource=self.datasource_name, intervals=intervals) if segment_metadata: return segment_metadata[-1]['columns'] @@ -78,16 +99,20 @@ def generate_metrics(self): col.generate_metrics() @classmethod - def sync_to_db(cls, name): - datasource = db.session.query(cls).filter_by(datasource_name=name).first() + def sync_to_db(cls, name, cluster): + session = get_session() + datasource = session.query(cls).filter_by(datasource_name=name).first() if not datasource: - db.session.add(cls(datasource_name=name)) - cols = cls.latest_metadata(name) + datasource = cls(datasource_name=name) + session.add(datasource) + datasource.cluster = cluster + + cols = datasource.latest_metadata() if not cols: return for col in cols: col_obj = ( - db.session + session .query(Column) .filter_by(datasource_name=name, column_name=col) .first() @@ -95,14 +120,14 @@ def sync_to_db(cls, name): datatype = cols[col]['type'] if not col_obj: col_obj = Column(datasource_name=name, column_name=col) - db.session.add(col_obj) + session.add(col_obj) if datatype == "STRING": col_obj.groupby = True col_obj.filterable = True if col_obj: col_obj.type = cols[col]['type'] col_obj.generate_metrics() - db.session.commit() + #session.commit() @property def column_names(self): @@ -171,8 +196,7 @@ def generate_metrics(self): metric_name='count', verbose_name='COUNT(*)', metric_type='count', - json=json.dumps({ - 'type': 'count', 'name': 'count'}) + json=json.dumps({'type': 'count', 'name': 'count'}) )) if self.sum and self.isnum: @@ -217,14 +241,15 @@ def generate_metrics(self): 'name': name, 'fieldNames': [self.column_name]}) )) + session = get_session() for metric in metrics: m = ( - db.session.query(M) + session.query(M) .filter(M.datasource_name==self.datasource_name) .filter(M.metric_name==metric.metric_name) .first() ) metric.datasource_name = self.datasource_name if not m: - db.session.add(metric) - db.session.commit() + session.add(metric) + session.commit() diff --git a/app/utils.py b/app/utils.py index 59ced72a13839..6276b002ee5b2 100644 --- a/app/utils.py +++ b/app/utils.py @@ -1,13 +1,9 @@ import config -from datetime import timedelta, datetime +from datetime import datetime import parsedatetime +from app import db -def get_pydruid_client(): - from pydruid import client - return client.PyDruid( - "http://{0}:{1}/".format(config.DRUID_HOST, config.DRUID_PORT), - config.DRUID_BASE_ENDPOINT) def parse_human_datetime(s): diff --git a/app/views.py b/app/views.py index c843f36eee3de..26c04b8bb7d83 100644 --- a/app/views.py +++ b/app/views.py @@ -1,11 +1,11 @@ -from datetime import timedelta +from datetime import datetime import logging import json from flask import request, redirect, flash, Response from flask.ext.appbuilder.models.sqla.interface import SQLAInterface from flask.ext.appbuilder import ModelView, CompactCRUDMixin, BaseView, expose -from app import appbuilder, db, models, viz, utils, app +from app import appbuilder, db, models, viz, utils, app, get_session from flask.ext.appbuilder.security.decorators import has_access, permission_name import config from pydruid.client import doublesum @@ -62,13 +62,32 @@ class MetricInlineView(CompactCRUDMixin, ModelView): appbuilder.add_view_no_menu(MetricInlineView) +class ClusterModelView(ModelView, DeleteMixin): + datamodel = SQLAInterface(models.Cluster) + add_columns = [ + 'cluster_name', + 'coordinator_host', 'coordinator_port', 'coordinator_endpoint', + 'broker_host', 'broker_port', 'broker_endpoint', + ] + edit_columns = add_columns + list_columns = ['cluster_name', 'metadata_last_refreshed'] + +appbuilder.add_view( + ClusterModelView, + "Clusters", + icon="fa-server", + category="Admin", + category_icon='fa-envelope') + + class DatasourceModelView(ModelView, DeleteMixin): datamodel = SQLAInterface(models.Datasource) - list_columns = ['datasource_link', 'owner', 'is_featured', 'is_hidden'] + list_columns = [ + 'datasource_link', 'cluster', 'owner', 'is_featured', 'is_hidden'] related_views = [ColumnInlineView, MetricInlineView] edit_columns = [ - 'datasource_name', 'description', 'owner', 'is_featured', 'is_hidden', - 'default_endpoint'] + 'datasource_name', 'cluster', 'description', 'owner', + 'is_featured', 'is_hidden', 'default_endpoint'] page_size = 100 base_order = ('datasource_name', 'asc') @@ -129,19 +148,15 @@ def datasource(self, datasource_name): @permission_name('refresh_datasources') @expose("/refresh_datasources/") def refresh_datasources(self): - import requests - endpoint = ( - "http://{COORDINATOR_HOST}:{COORDINATOR_PORT}/" - "{COORDINATOR_BASE_ENDPOINT}/datasources" - ).format(**config.__dict__) - datasources = json.loads(requests.get(endpoint).text) - for datasource in datasources: - try: - models.Datasource.sync_to_db(datasource) - except Exception as e: - logging.exception(e) - logging.error("Failed at syncing " + datasource) - flash("Refreshed metadata from Druid!", 'info') + session = db.session() + for cluster in session.query(models.Cluster).all(): + cluster.refresh_datasources() + cluster.metadata_last_refreshed = datetime.now() + flash( + "Refreshed metadata from cluster " + "[" + cluster.cluster_name + "]", + 'info') + session.commit() return redirect("/datasourcemodelview/list/") @expose("/autocomplete///") diff --git a/app/viz.py b/app/viz.py index 700c0d611ed9d..8dac33c97b7e7 100644 --- a/app/viz.py +++ b/app/viz.py @@ -164,12 +164,12 @@ def query_obj(self): return d def bake_query(self): - client = utils.get_pydruid_client() + client = self.datasource.cluster.get_pydruid_client() client.groupby(**self.query_obj()) return client.export_pandas() def get_query(self): - client = utils.get_pydruid_client() + client = self.datasource.cluster.get_pydruid_client() client.groupby(**self.query_obj()) return client.query_dict @@ -265,7 +265,7 @@ def bake_query(self): """ Doing a 2 phase query where we limit the number of series. """ - client = utils.get_pydruid_client() + client = self.datasource.cluster.get_pydruid_client() qry = self.query_obj() orig_filter = qry['filter'] if 'filter' in qry else '' qry['granularity'] = "all" diff --git a/config.py b/config.py index 87d19d2cd17c9..ed71108fc108b 100644 --- a/config.py +++ b/config.py @@ -14,14 +14,6 @@ #--------------------------------------------------------- ROW_LIMIT = 5000 -DRUID_HOST = '0.0.0.0' -DRUID_PORT = '8084' -DRUID_BASE_ENDPOINT = 'druid/v2' - -COORDINATOR_HOST = '0.0.0.0' -COORDINATOR_PORT = '8081' -COORDINATOR_BASE_ENDPOINT = 'druid/coordinator/v1' - PANORAMIX_WEBSERVER_PORT = 8088 #---------------------------------------------------------