Skip to content

Commit

Permalink
Skeleton for remote query execution using celery. (#908)
Browse files Browse the repository at this point in the history
* Carapal react mockup

This is really just a mock up written in React to try different
components. It could become scaffolding to build a prototype, or not.

* Preliminary commit for Celery backend

* Move the SQL query execution to the celery worker.

* React scetch

* Refactor SQL execution to use the celery if configured.

* Refactor SQL execution to use the celery if configured.

* Add query model

* Remove QueryResult. Query has a tmp_table_name field that has all the data.

* Add create table as wrapper.

* Create table as

* Address the comments.

* Add trailing commas

* Remove the init_query test.

* Handle 'undefined' schema case
  • Loading branch information
bkyryliuk authored and mistercrunch committed Aug 11, 2016
1 parent 7e55c6b commit 754d0cf
Show file tree
Hide file tree
Showing 13 changed files with 843 additions and 77 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ dist
caravel.egg-info/
app.db
*.bak
.idea
*.sqllite

# Node.js, webpack artifacts
*.entry.js
Expand Down
22 changes: 21 additions & 1 deletion caravel/bin/caravel
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ from __future__ import print_function
from __future__ import unicode_literals

import logging
import celery
from celery.bin import worker as celery_worker
from datetime import datetime
from subprocess import Popen
import textwrap

from flask_migrate import MigrateCommand
from flask_script import Manager
Expand Down Expand Up @@ -127,5 +128,24 @@ def refresh_druid():
session.commit()


@manager.command
def worker():
"""Starts a Caravel worker for async SQL query execution."""
# celery -A tasks worker --loglevel=info
print("Starting SQL Celery worker.")
if config.get('CELERY_CONFIG'):
print("Celery broker url: ")
print(config.get('CELERY_CONFIG').BROKER_URL)

application = celery.current_app._get_current_object()
c_worker = celery_worker.worker(app=application)
options = {
'broker': config.get('CELERY_CONFIG').BROKER_URL,
'loglevel': 'INFO',
'traceback': True,
}
c_worker.run(**options)


if __name__ == "__main__":
manager.run()
16 changes: 16 additions & 0 deletions caravel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,22 @@

# Set this API key to enable Mapbox visualizations
MAPBOX_API_KEY = ""
# Maximum number of rows returned in the SQL editor
SQL_MAX_ROW = 1000

# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
"""
# Example:
class CeleryConfig(object):
BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'
CELERY_IMPORTS = ('caravel.tasks', )
CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
CELERY_CONFIG = CeleryConfig
"""
CELERY_CONFIG = None

try:
from caravel_config import * # noqa
Expand All @@ -188,3 +203,4 @@

if not CACHE_DEFAULT_TIMEOUT:
CACHE_DEFAULT_TIMEOUT = CACHE_CONFIG.get('CACHE_DEFAULT_TIMEOUT')

39 changes: 39 additions & 0 deletions caravel/migrations/versions/ad82a75afd82_add_query_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Update models to support storing the queries.
Revision ID: ad82a75afd82
Revises: f162a1dea4c4
Create Date: 2016-07-25 17:48:12.771103
"""

# revision identifiers, used by Alembic.
revision = 'ad82a75afd82'
down_revision = 'f162a1dea4c4'

from alembic import op
import sqlalchemy as sa

def upgrade():
op.create_table('query',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('database_id', sa.Integer(), nullable=False),
sa.Column('tmp_table_name', sa.String(length=64), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=16), nullable=True),
sa.Column('name', sa.String(length=64), nullable=True),
sa.Column('sql', sa.Text, nullable=True),
sa.Column('limit', sa.Integer(), nullable=True),
sa.Column('progress', sa.Integer(), nullable=True),
sa.Column('start_time', sa.DateTime(), nullable=True),
sa.Column('end_time', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['database_id'], [u'dbs.id'], ),
sa.ForeignKeyConstraint(['user_id'], [u'ab_user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.add_column('dbs', sa.Column('select_as_create_table_as', sa.Boolean(),
nullable=True))


def downgrade():
op.drop_table('query')
op.drop_column('dbs', 'select_as_create_table_as')
42 changes: 40 additions & 2 deletions caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
from pydruid.utils.having import Aggregation
from six import string_types
from sqlalchemy import (
Column, Integer, String, ForeignKey, Text, Boolean, DateTime, Date,
Table, create_engine, MetaData, desc, asc, select, and_, func)
Column, Integer, String, ForeignKey, Text, Boolean, DateTime, Date, Table,
create_engine, MetaData, desc, asc, select, and_, func
)
from sqlalchemy.engine import reflection
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import relationship
Expand Down Expand Up @@ -378,6 +379,7 @@ class Database(Model, AuditMixinNullable):
sqlalchemy_uri = Column(String(1024))
password = Column(EncryptedType(String(1024), config.get('SECRET_KEY')))
cache_timeout = Column(Integer)
select_as_create_table_as = Column(Boolean, default=True)
extra = Column(Text, default=textwrap.dedent("""\
{
"metadata_params": {},
Expand Down Expand Up @@ -1706,3 +1708,39 @@ class FavStar(Model):
class_name = Column(String(50))
obj_id = Column(Integer)
dttm = Column(DateTime, default=func.now())


class QueryStatus:
SCHEDULED = 'SCHEDULED'
CANCELLED = 'CANCELLED'
IN_PROGRESS = 'IN_PROGRESS'
FINISHED = 'FINISHED'
TIMED_OUT = 'TIMED_OUT'
FAILED = 'FAILED'


class Query(Model):

"""ORM model for SQL query"""

__tablename__ = 'query'
id = Column(Integer, primary_key=True)

database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False)

# Store the tmp table into the DB only if the user asks for it.
tmp_table_name = Column(String(64))
user_id = Column(Integer, ForeignKey('ab_user.id'), nullable=True)

# models.QueryStatus
status = Column(String(16))

name = Column(String(64))
sql = Column(Text)
# Could be configured in the caravel config
limit = Column(Integer)

# 1..100
progress = Column(Integer)
start_time = Column(DateTime)
end_time = Column(DateTime)
219 changes: 219 additions & 0 deletions caravel/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import celery
from caravel import models, app, utils
from datetime import datetime
import logging
from sqlalchemy import create_engine, select, text
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.sql.expression import TextAsFrom
import sqlparse
import pandas as pd

celery_app = celery.Celery(config_source=app.config.get('CELERY_CONFIG'))


def is_query_select(sql):
try:
return sqlparse.parse(sql)[0].get_type() == 'SELECT'
# Capture sqlparse exceptions, worker shouldn't fail here.
except Exception:
# TODO(bkyryliuk): add logging here.
return False


# if sqlparse provides the stream of tokens but don't provide the API
# to access the table names, more on it:
# https://groups.google.com/forum/#!topic/sqlparse/sL2aAi6dSJU
# https://github.com/andialbrecht/sqlparse/blob/master/examples/
# extract_table_names.py
#
# Another approach would be to run the EXPLAIN on the sql statement:
# https://prestodb.io/docs/current/sql/explain.html
# https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Explain
def get_tables():
"""Retrieves the query names from the query."""
# TODO(bkyryliuk): implement parsing the sql statement.
pass


def add_limit_to_the_query(sql, limit, eng):
# Treat as single sql statement in case of failure.
sql_statements = [sql]
try:
sql_statements = [s for s in sqlparse.split(sql) if s]
except Exception as e:
logging.info(
"Statement " + sql + "failed to be transformed to have the limit "
"with the exception" + e.message)
return sql
if len(sql_statements) == 1 and is_query_select(sql):
qry = select('*').select_from(
TextAsFrom(text(sql_statements[0]), ['*']).alias(
'inner_qry')).limit(limit)
sql_statement = str(qry.compile(
eng, compile_kwargs={"literal_binds": True}))
return sql_statement
return sql


# create table works only for the single statement.
def create_table_as(sql, table_name, override=False):
"""Reformats the query into the create table as query.
Works only for the single select SQL statements, in all other cases
the sql query is not modified.
:param sql: string, sql query that will be executed
:param table_name: string, will contain the results of the query execution
:param override, boolean, table table_name will be dropped if true
:return: string, create table as query
"""
# TODO(bkyryliuk): drop table if allowed, check the namespace and
# the permissions.
# Treat as single sql statement in case of failure.
sql_statements = [sql]
try:
# Filter out empty statements.
sql_statements = [s for s in sqlparse.split(sql) if s]
except Exception as e:
logging.info(
"Statement " + sql + "failed to be transformed as create table as "
"with the exception" + e.message)
return sql
if len(sql_statements) == 1 and is_query_select(sql):
updated_sql = ''
# TODO(bkyryliuk): use sqlalchemy statements for the
# the drop and create operations.
if override:
updated_sql = 'DROP TABLE IF EXISTS {};\n'.format(table_name)
updated_sql += "CREATE TABLE %s AS %s" % (
table_name, sql_statements[0])
return updated_sql
return sql


def get_session():
"""Creates new SQLAlchemy scoped_session."""
engine = create_engine(
app.config.get('SQLALCHEMY_DATABASE_URI'), convert_unicode=True)
return scoped_session(sessionmaker(
autocommit=False, autoflush=False, bind=engine))


@celery_app.task
def get_sql_results(database_id, sql, user_id, tmp_table_name="", schema=None):
"""Executes the sql query returns the results.
:param database_id: integer
:param sql: string, query that will be executed
:param user_id: integer
:param tmp_table_name: name of the table for CTA
:param schema: string, name of the schema (used in presto)
:return: dataframe, query result
"""
# Create a separate session, reusing the db.session leads to the
# concurrency issues.
session = get_session()
try:
db_to_query = (
session.query(models.Database).filter_by(id=database_id).first()
)
except Exception as e:
return {
'error': utils.error_msg_from_exception(e),
'success': False,
}
if not db_to_query:
return {
'error': "Database with id {0} is missing.".format(database_id),
'success': False,
}

# TODO(bkyryliuk): provide a way for the user to name the query.
# TODO(bkyryliuk): run explain query to derive the tables and fill in the
# table_ids
# TODO(bkyryliuk): check the user permissions
# TODO(bkyryliuk): store the tab name in the query model
limit = app.config.get('SQL_MAX_ROW', None)
start_time = datetime.now()
if not tmp_table_name:
tmp_table_name = 'tmp.{}_table_{}'.format(user_id, start_time)
query = models.Query(
user_id=user_id,
database_id=database_id,
limit=limit,
name='{}'.format(start_time),
sql=sql,
start_time=start_time,
tmp_table_name=tmp_table_name,
status=models.QueryStatus.IN_PROGRESS,
)
session.add(query)
session.commit()
query_result = get_sql_results_as_dict(
db_to_query, sql, query.tmp_table_name, schema=schema)
query.end_time = datetime.now()
if query_result['success']:
query.status = models.QueryStatus.FINISHED
else:
query.status = models.QueryStatus.FAILED
session.commit()
# TODO(bkyryliuk): return the tmp table / query_id
return query_result


# TODO(bkyryliuk): merge the changes made in the carapal first
# before merging this PR.
def get_sql_results_as_dict(db_to_query, sql, tmp_table_name, schema=None):
"""Get the SQL query results from the give session and db connection.
:param sql: string, query that will be executed
:param db_to_query: models.Database to query, cannot be None
:param tmp_table_name: name of the table for CTA
:param schema: string, name of the schema (used in presto)
:return: (dataframe, boolean), results and the status
"""
eng = db_to_query.get_sqla_engine(schema=schema)
sql = sql.strip().strip(';')
# TODO(bkyryliuk): fix this case for multiple statements
if app.config.get('SQL_MAX_ROW'):
sql = add_limit_to_the_query(
sql, app.config.get("SQL_MAX_ROW"), eng)

cta_used = False
if (app.config.get('SQL_SELECT_AS_CTA') and
db_to_query.select_as_create_table_as and is_query_select(sql)):
# TODO(bkyryliuk): figure out if the query is select query.
sql = create_table_as(sql, tmp_table_name)
cta_used = True

if cta_used:
try:
eng.execute(sql)
return {
'tmp_table': tmp_table_name,
'success': True,
}
except Exception as e:
return {
'error': utils.error_msg_from_exception(e),
'success': False,
}

# otherwise run regular SQL query.
# TODO(bkyryliuk): rewrite into eng.execute as queries different from
# select should be permitted too.
try:
df = db_to_query.get_df(sql, schema)
df = df.fillna(0)
return {
'columns': [c for c in df.columns],
'data': df.to_dict(orient='records'),
'success': True,
}

except Exception as e:
return {
'error': utils.error_msg_from_exception(e),
'success': False,
}


Loading

0 comments on commit 754d0cf

Please sign in to comment.