Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preliminary commit for Celery backend #1454

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions caravel/bin/caravel
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,11 @@ def refresh_druid():
session.commit()


@manager.command
def worker():
"""Starts a Caravel worker for async query load"""
raise NotImplementedError("# TODO! @b.kyryliuk")


if __name__ == "__main__":
manager.run()
18 changes: 18 additions & 0 deletions caravel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,23 @@
INTERVAL = 1
BACKUP_COUNT = 30

# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
"""
# Example:
class CeleryConfig(object):
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
## Broker settings.
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_IMPORTS = ('myapp.tasks', )
CELERY_RESULT_BACKEND = 'db+sqlite:///results.db'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
"""
CELERY_CONFIG = None

# Maximum number of rows returned in the SQL editor
SQL_MAX_ROW = 1000

try:
from caravel_config import * # noqa
Expand All @@ -181,3 +198,4 @@

if not CACHE_DEFAULT_TIMEOUT:
CACHE_DEFAULT_TIMEOUT = CACHE_CONFIG.get('CACHE_DEFAULT_TIMEOUT')

23 changes: 23 additions & 0 deletions caravel/migrations/versions/33459b145c15_allow_temp_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""allow_temp_table

Revision ID: 33459b145c15
Revises: d8bc074f7aad
Create Date: 2016-06-13 15:54:08.117103

"""

# revision identifiers, used by Alembic.
revision = '33459b145c15'
down_revision = 'd8bc074f7aad'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column(
'dbs', sa.Column('allow_temp_table', sa.Boolean(), nullable=True))


def downgrade():
op.drop_column('dbs', 'allow_temp_table')
1 change: 1 addition & 0 deletions caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ class Database(Model, AuditMixinNullable):
sqlalchemy_uri = Column(String(1024))
password = Column(EncryptedType(String(1024), config.get('SECRET_KEY')))
cache_timeout = Column(Integer)
allow_temp_table = Column(Boolean, default=False)
extra = Column(Text, default=textwrap.dedent("""\
{
"metadata_params": {},
Expand Down
93 changes: 57 additions & 36 deletions caravel/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
from __future__ import print_function
from __future__ import unicode_literals

from datetime import datetime
import json
import logging
import re
import sys
import time
import traceback
from datetime import datetime

import pandas as pd
import sqlalchemy as sqla

import celery
from flask import (
g, request, redirect, flash, Response, render_template, Markup)
from flask_appbuilder import ModelView, CompactCRUDMixin, BaseView, expose
Expand All @@ -36,6 +37,47 @@
config = app.config
log_this = models.Log.log_this

celery_app = celery.Celery(celery_config=config.get('CELERY_CONFIG'))

@celery_app.task
def get_sql_results(database_id, sql, async=False):
"""Gets sql results from a Caravel database connection"""
# TODO @b.kyryliuk handle async
# handle models.Queries (userid, sql, timestamps, status) index on userid, state, start_ddtm
session = db.session()
mydb = session.query(models.Database).filter_by(id=database_id).first()

if (
not self.appbuilder.sm.has_access(
'all_datasource_access', 'all_datasource_access')):
raise utils.CaravelSecurityException(_(
"This view requires the `all_datasource_access` permission"))
content = ""
if mydb:
eng = mydb.get_sqla_engine()
if config.SQL_MAX_ROW:
sql = sql.strip().strip(';')
qry = (
select('*')
.select_from(TextAsFrom(text(sql), ['*']).alias('inner_qry'))
.limit(config.SQL_MAX_ROW)
)
sql = str(qry.compile(eng, compile_kwargs={"literal_binds": True}))
try:
df = pd.read_sql_query(sql=sql, con=eng)
content = df.to_html(
index=False,
na_rep='',
classes=(
"dataframe table table-striped table-bordered "
"table-condensed sql_results").split(' '))
except Exception as e:
content = (
'<div class="alert alert-danger">'
"{}</div>"
).format(e.message)
session.commit()


def check_ownership(obj, raise_if_false=True):
"""Meant to be used in `pre_update` hooks on models to enforce ownership
Expand Down Expand Up @@ -285,7 +327,8 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
datamodel = SQLAInterface(models.Database)
list_columns = ['database_name', 'sql_link', 'creator', 'changed_on_']
add_columns = [
'database_name', 'sqlalchemy_uri', 'cache_timeout', 'extra']
'database_name', 'sqlalchemy_uri', 'cache_timeout', 'extra',
'allow_temp_table']
search_exclude_columns = ('password',)
edit_columns = add_columns
add_template = "caravel/models/database/add.html"
Expand All @@ -305,6 +348,10 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
"gets unpacked into the [sqlalchemy.MetaData]"
"(http://docs.sqlalchemy.org/en/rel_1_0/core/metadata.html"
"#sqlalchemy.schema.MetaData) call. ", True),
'allow_temp_table': (
"Whether Caravel can run async queries by and attempt to "
"store results in temporary tables"
),
}
label_columns = {
'database_name': _("Database"),
Expand Down Expand Up @@ -1018,45 +1065,19 @@ def select_star(self, database_id, table_name):
@log_this
def runsql(self):
"""Runs arbitrary sql and returns and html table"""
session = db.session()
limit = 1000
data = json.loads(request.form.get('data'))
sql = data.get('sql')
database_id = data.get('database_id')
mydb = session.query(models.Database).filter_by(id=database_id).first()

if (
not self.appbuilder.sm.has_access(
'all_datasource_access', 'all_datasource_access')):
raise utils.CaravelSecurityException(_(
"This view requires the `all_datasource_access` permission"))
content = ""
if mydb:
eng = mydb.get_sqla_engine()
if limit:
sql = sql.strip().strip(';')
qry = (
select('*')
.select_from(TextAsFrom(text(sql), ['*']).alias('inner_qry'))
.limit(limit)
)
sql = str(qry.compile(eng, compile_kwargs={"literal_binds": True}))
try:
df = pd.read_sql_query(sql=sql, con=eng)
content = df.to_html(
index=False,
na_rep='',
classes=(
"dataframe table table-striped table-bordered "
"table-condensed sql_results").split(' '))
except Exception as e:
content = (
'<div class="alert alert-danger">'
"{}</div>"
).format(e.message)
session.commit()
# TODO @b.kyryliuk handle async
content = get_sql_results(database_id, sql)
# get_sql_results.async(database_id, sql)
return content

@expose("/async_sql_status/")
def async_sql_status(self, userid):
#TODO @b.kyryliuk
return

@has_access
@expose("/refresh_datasources/")
def refresh_datasources(self):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
install_requires=[
'alembic>=0.8.5, <0.9.0',
'babel==2.3.4',
'celery==3.1.23',
'cryptography>=1.1.1, <2.0.0',
'flask-appbuilder>=1.7.1, <2.0.0',
'Flask-BabelPkg==0.9.6',
Expand Down