Skip to content

Commit

Permalink
Using sqlalchemy expression API to support all SQL dialects
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Aug 12, 2015
1 parent 3a0eb07 commit 59efbcd
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 7 deletions.
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ Buzz Phrases
Database Support
----------------

Panoramix was originally designed on to of Druid.io, but quickly broadened
to support other databases through the use of SqlAlchemy, a Python
Panoramix was originally designed on to of Druid.io, but quickly broadened
its scope to support other databases through the use of SqlAlchemy, a Python
ORM that is compatible with
[many external databases](http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html).
At the moment the SQL has been hard coded to use ``LIMIT``-type
dialect and needs to be extended to support other syntax
(``TOP``, ``ROWNUM``, ...)

What's Druid?
-------------
Expand Down
91 changes: 89 additions & 2 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@
from dateutil.parser import parse
from pydruid import client
from pydruid.utils.filters import Dimension, Filter
from pandas import read_sql_query
from sqlalchemy.sql import table, literal_column
from sqlalchemy import select, and_, text, String

from copy import deepcopy, copy
from collections import namedtuple
from datetime import datetime
import logging
import json
import sqlparse
import requests
import textwrap

from app import db, get_session
from app import db, get_session, utils

QueryResult = namedtuple('namedtuple', ['df', 'query', 'duration'])

Expand Down Expand Up @@ -87,14 +91,19 @@ def metrics_combo(self):
for m in self.metrics],
key=lambda x: x[1])

def query(
def query_bkp(
self, groupby, metrics,
granularity,
from_dttm, to_dttm,
limit_spec=None,
filter=None,
is_timeseries=True,
timeseries_limit=15, row_limit=None):
"""
Unused, legacy way of querying by building a SQL string without
using the sqlalchemy expression API (new approach which supports
all dialects)
"""
from pandas import read_sql_query
qry_start_dttm = datetime.now()
metrics_exprs = [
Expand Down Expand Up @@ -172,6 +181,82 @@ def query(
return QueryResult(
df=df, duration=datetime.now() - qry_start_dttm, query=sql)

def query(
self, groupby, metrics,
granularity,
from_dttm, to_dttm,
limit_spec=None,
filter=None,
is_timeseries=True,
timeseries_limit=15, row_limit=None):

qry_start_dttm = datetime.now()
timestamp = literal_column(
self.main_datetime_column.column_name).label('timestamp')
metrics_exprs = [
literal_column(m.expression).label(m.metric_name)
for m in self.metrics if m.metric_name in metrics]

if metrics:
main_metric_expr = literal_column(
[m.expression for m in self.metrics if m.metric_name == metrics[0]][0])
else:
main_metric_expr = literal_column("COUNT(*)")

select_exprs = []
groupby_exprs = []

if groupby:
select_exprs = [literal_column(s) for s in groupby]
groupby_exprs = [literal_column(s) for s in groupby]
inner_groupby_exprs = [literal_column(s).label('__' + s) for s in groupby]
select_exprs += metrics_exprs
if granularity != "all":
select_exprs += [timestamp]
groupby_exprs += [timestamp]

qry = select(select_exprs)
from_clause = table(self.table_name)
qry = qry.group_by(*groupby_exprs)

where_clause_and = [
timestamp >= from_dttm.isoformat(),
timestamp < to_dttm.isoformat(),
]
for col, op, eq in filter:
if op in ('in', 'not in'):
values = eq.split(",")
cond = literal_column(col).in_(values)
if op == 'not in':
cond = ~cond
where_clause_and.append(cond)
qry = qry.where(and_(*where_clause_and))
qry = qry.limit(row_limit)

if timeseries_limit and groupby:
subq = select(inner_groupby_exprs)
subq = subq.select_from(table(self.table_name))
subq = subq.where(and_(*where_clause_and))
subq = subq.group_by(*inner_groupby_exprs)
subq = subq.limit(timeseries_limit)
on_clause = []
for gb in groupby:
on_clause.append(literal_column(s)==literal_column("__" + s))

from_clause = from_clause.join(subq.alias(), and_(*on_clause))

qry = qry.select_from(from_clause)

engine = self.database.get_sqla_engine()
sql = str(qry.compile(engine, compile_kwargs={"literal_binds": True}))
df = read_sql_query(
sql=sql,
con=engine
)
sql = sqlparse.format(sql, reindent=True)
return QueryResult(
df=df, duration=datetime.now() - qry_start_dttm, query=sql)


def fetch_metadata(self):
table = self.database.get_table(self.table_name)
Expand Down Expand Up @@ -421,7 +506,9 @@ def query(
}],
}
client.groupby(**pre_qry)
query_str += "// Two phase query\n// Phase 1\n"
query_str += json.dumps(client.query_dict, indent=2) + "\n"
query_str += "//\nPhase 2 (built based on phase one's results)\n"
df = client.export_pandas()
if not df is None and not df.empty:
dims = qry['dimensions']
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pydruid
pyhive
python-dateutil
requests
sqlparse

0 comments on commit 59efbcd

Please sign in to comment.