From 6fb3b305ad23c27d0555ded2ab80820000fdec50 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Thu, 20 Oct 2016 23:40:24 -0700 Subject: [PATCH] [sqllab] add support for results backends (#1377) * [sqllab] add support for results backends Long running SQL queries (beyond the scope of a web request) can now use a k/v store to hold their result sets. * Addressing comments, fixed js tests * Fixing mysql has gone away * Adressing more comments * Touchups --- caravel/__init__.py | 31 +-- caravel/assets/javascripts/SqlLab/actions.js | 130 ++++++++-- caravel/assets/javascripts/SqlLab/common.js | 4 + .../javascripts/SqlLab/components/App.jsx | 20 +- .../SqlLab/components/DataPreviewModal.jsx | 58 +++++ .../{SqlShrink.jsx => HighlightedSql.jsx} | 29 ++- .../SqlLab/components/QueryTable.jsx | 69 ++++-- .../SqlLab/components/ResultSet.jsx | 227 +++++++++++++----- .../SqlLab/components/SouthPane.jsx | 83 ++----- .../SqlLab/components/SqlEditor.jsx | 58 +---- .../SqlLab/components/TabbedSqlEditors.jsx | 25 +- .../SqlLab/components/TableElement.jsx | 52 +++- .../SqlLab/components/VisualizeModal.jsx | 2 - caravel/assets/javascripts/SqlLab/index.jsx | 6 +- caravel/assets/javascripts/SqlLab/main.css | 16 ++ caravel/assets/javascripts/SqlLab/reducers.js | 31 ++- .../components/CopyToClipboard.jsx | 18 +- .../javascripts/components/ModalTrigger.jsx | 14 +- .../javascripts/sqllab/TableElement_spec.jsx | 2 +- caravel/config.py | 5 + caravel/db_engine_specs.py | 39 +++ .../7e3ddad2a00b_results_key_to_query.py | 22 ++ caravel/models.py | 3 + caravel/results_backends.py | 49 ++++ caravel/sql_lab.py | 84 +++---- caravel/utils.py | 17 ++ caravel/views.py | 51 +++- 27 files changed, 784 insertions(+), 361 deletions(-) create mode 100644 caravel/assets/javascripts/SqlLab/components/DataPreviewModal.jsx rename caravel/assets/javascripts/SqlLab/components/{SqlShrink.jsx => HighlightedSql.jsx} (57%) create mode 100644 caravel/migrations/versions/7e3ddad2a00b_results_key_to_query.py create mode 100644 caravel/results_backends.py diff --git a/caravel/__init__.py b/caravel/__init__.py index bb74091a714a5..d53b8e579ca12 100644 --- a/caravel/__init__.py +++ b/caravel/__init__.py @@ -10,12 +10,12 @@ from flask import Flask, redirect from flask_appbuilder import SQLA, AppBuilder, IndexView -from sqlalchemy import event, exc from flask_appbuilder.baseviews import expose from flask_cache import Cache from flask_migrate import Migrate from caravel.source_registry import SourceRegistry from werkzeug.contrib.fixers import ProxyFix +from caravel import utils APP_DIR = os.path.dirname(__file__) @@ -31,33 +31,7 @@ db = SQLA(app) -@event.listens_for(db.engine, 'checkout') -def checkout(dbapi_con, con_record, con_proxy): - """ - Making sure the connection is live, and preventing against: - 'MySQL server has gone away' - - Copied from: - http://stackoverflow.com/questions/30630120/mysql-keeps-losing-connection-during-celery-tasks - """ - try: - try: - if hasattr(dbapi_con, 'ping'): - dbapi_con.ping(False) - else: - cursor = dbapi_con.cursor() - cursor.execute("SELECT 1") - except TypeError: - app.logger.debug('MySQL connection died. Restoring...') - dbapi_con.ping() - except dbapi_con.OperationalError as e: - app.logger.warning(e) - if e.args[0] in (2006, 2013, 2014, 2045, 2055): - raise exc.DisconnectionError() - else: - raise - return db - +utils.pessimistic_connection_handling(db.engine.pool) cache = Cache(app, config=app.config.get('CACHE_CONFIG')) @@ -103,6 +77,7 @@ def index(self): sm = appbuilder.sm get_session = appbuilder.get_session +results_backend = app.config.get("RESULTS_BACKEND") # Registering sources module_datasource_map = app.config.get("DEFAULT_MODULE_DS_MAP") diff --git a/caravel/assets/javascripts/SqlLab/actions.js b/caravel/assets/javascripts/SqlLab/actions.js index b03a30d259623..f805dce41141b 100644 --- a/caravel/assets/javascripts/SqlLab/actions.js +++ b/caravel/assets/javascripts/SqlLab/actions.js @@ -1,17 +1,17 @@ +import shortid from 'shortid'; +import { now } from '../modules/dates'; +const $ = require('jquery'); + export const RESET_STATE = 'RESET_STATE'; export const ADD_QUERY_EDITOR = 'ADD_QUERY_EDITOR'; export const CLONE_QUERY_TO_NEW_TAB = 'CLONE_QUERY_TO_NEW_TAB'; export const REMOVE_QUERY_EDITOR = 'REMOVE_QUERY_EDITOR'; export const MERGE_TABLE = 'MERGE_TABLE'; export const REMOVE_TABLE = 'REMOVE_TABLE'; -export const START_QUERY = 'START_QUERY'; -export const STOP_QUERY = 'STOP_QUERY'; export const END_QUERY = 'END_QUERY'; export const REMOVE_QUERY = 'REMOVE_QUERY'; export const EXPAND_TABLE = 'EXPAND_TABLE'; export const COLLAPSE_TABLE = 'COLLAPSE_TABLE'; -export const QUERY_SUCCESS = 'QUERY_SUCCESS'; -export const QUERY_FAILED = 'QUERY_FAILED'; export const QUERY_EDITOR_SETDB = 'QUERY_EDITOR_SETDB'; export const QUERY_EDITOR_SET_SCHEMA = 'QUERY_EDITOR_SET_SCHEMA'; export const QUERY_EDITOR_SET_TITLE = 'QUERY_EDITOR_SET_TITLE'; @@ -25,11 +25,117 @@ export const ADD_ALERT = 'ADD_ALERT'; export const REMOVE_ALERT = 'REMOVE_ALERT'; export const REFRESH_QUERIES = 'REFRESH_QUERIES'; export const SET_NETWORK_STATUS = 'SET_NETWORK_STATUS'; +export const RUN_QUERY = 'RUN_QUERY'; +export const START_QUERY = 'START_QUERY'; +export const STOP_QUERY = 'STOP_QUERY'; +export const REQUEST_QUERY_RESULTS = 'REQUEST_QUERY_RESULTS'; +export const QUERY_SUCCESS = 'QUERY_SUCCESS'; +export const QUERY_FAILED = 'QUERY_FAILED'; +export const CLEAR_QUERY_RESULTS = 'CLEAR_QUERY_RESULTS'; +export const HIDE_DATA_PREVIEW = 'HIDE_DATA_PREVIEW'; export function resetState() { return { type: RESET_STATE }; } +export function startQuery(query) { + Object.assign(query, { + id: shortid.generate(), + progress: 0, + startDttm: now(), + state: (query.runAsync) ? 'pending' : 'running', + }); + return { type: START_QUERY, query }; +} + +export function querySuccess(query, results) { + return { type: QUERY_SUCCESS, query, results }; +} + +export function queryFailed(query, msg) { + return { type: QUERY_FAILED, query, msg }; +} + +export function stopQuery(query) { + return { type: STOP_QUERY, query }; +} + +export function clearQueryResults(query) { + return { type: CLEAR_QUERY_RESULTS, query }; +} + +export function hideDataPreview() { + return { type: HIDE_DATA_PREVIEW }; +} + +export function requestQueryResults(query) { + return { type: REQUEST_QUERY_RESULTS, query }; +} + +export function fetchQueryResults(query) { + return function (dispatch) { + dispatch(requestQueryResults(query)); + const sqlJsonUrl = `/caravel/results/${query.resultsKey}/`; + $.ajax({ + type: 'GET', + dataType: 'json', + url: sqlJsonUrl, + success(results) { + dispatch(querySuccess(query, results)); + }, + error() { + dispatch(queryFailed(query, 'Failed at retrieving results from the results backend')); + }, + }); + }; +} + +export function runQuery(query) { + return function (dispatch) { + dispatch(startQuery(query)); + const sqlJsonUrl = '/caravel/sql_json/'; + const sqlJsonRequest = { + client_id: query.id, + database_id: query.dbId, + json: true, + runAsync: query.runAsync, + schema: query.schema, + sql: query.sql, + sql_editor_id: query.sqlEditorId, + tab: query.tab, + tmp_table_name: query.tempTableName, + select_as_cta: query.ctas, + }; + $.ajax({ + type: 'POST', + dataType: 'json', + url: sqlJsonUrl, + data: sqlJsonRequest, + success(results) { + if (!query.runAsync) { + dispatch(querySuccess(query, results)); + } + }, + error(err, textStatus, errorThrown) { + let msg; + try { + msg = err.responseJSON.error; + } catch (e) { + if (err.responseText !== undefined) { + msg = err.responseText; + } + } + if (textStatus === 'error' && errorThrown === '') { + msg = 'Could not connect to server'; + } else if (msg === null) { + msg = `[${textStatus}] ${errorThrown}`; + } + dispatch(queryFailed(query, msg)); + }, + }); + }; +} + export function setDatabases(databases) { return { type: SET_DATABASES, databases }; } @@ -102,22 +208,6 @@ export function removeTable(table) { return { type: REMOVE_TABLE, table }; } -export function startQuery(query) { - return { type: START_QUERY, query }; -} - -export function stopQuery(query) { - return { type: STOP_QUERY, query }; -} - -export function querySuccess(query, results) { - return { type: QUERY_SUCCESS, query, results }; -} - -export function queryFailed(query, msg) { - return { type: QUERY_FAILED, query, msg }; -} - export function addWorkspaceQuery(query) { return { type: ADD_WORKSPACE_QUERY, query }; } diff --git a/caravel/assets/javascripts/SqlLab/common.js b/caravel/assets/javascripts/SqlLab/common.js index 6d0ab5f5979df..4005c591a727f 100644 --- a/caravel/assets/javascripts/SqlLab/common.js +++ b/caravel/assets/javascripts/SqlLab/common.js @@ -1,8 +1,12 @@ export const STATE_BSSTYLE_MAP = { failed: 'danger', pending: 'info', + fetching: 'info', running: 'warning', + stopped: 'danger', success: 'success', }; +export const DATA_PREVIEW_ROW_COUNT = 100; + export const STATUS_OPTIONS = ['success', 'failed', 'running']; diff --git a/caravel/assets/javascripts/SqlLab/components/App.jsx b/caravel/assets/javascripts/SqlLab/components/App.jsx index 9cf62e4559647..074ef63b98e39 100644 --- a/caravel/assets/javascripts/SqlLab/components/App.jsx +++ b/caravel/assets/javascripts/SqlLab/components/App.jsx @@ -5,6 +5,7 @@ import TabbedSqlEditors from './TabbedSqlEditors'; import QueryAutoRefresh from './QueryAutoRefresh'; import QuerySearch from './QuerySearch'; import Alerts from './Alerts'; +import DataPreviewModal from './DataPreviewModal'; import { bindActionCreators } from 'redux'; import { connect } from 'react-redux'; @@ -26,8 +27,9 @@ class App extends React.Component { this.setState({ hash: window.location.hash }); } render() { + let content; if (this.state.hash) { - return ( + content = (
@@ -37,16 +39,18 @@ class App extends React.Component {
); } + content = ( +
+ + +
+ ); return (
+ +
- - -
-
- -
-
+ {content}
); diff --git a/caravel/assets/javascripts/SqlLab/components/DataPreviewModal.jsx b/caravel/assets/javascripts/SqlLab/components/DataPreviewModal.jsx new file mode 100644 index 0000000000000..989b94148ce9f --- /dev/null +++ b/caravel/assets/javascripts/SqlLab/components/DataPreviewModal.jsx @@ -0,0 +1,58 @@ +import * as Actions from '../actions'; +import React from 'react'; +import { bindActionCreators } from 'redux'; +import { connect } from 'react-redux'; +import { Modal } from 'react-bootstrap'; + +import ResultSet from './ResultSet'; + +const propTypes = { + queries: React.PropTypes.object, + actions: React.PropTypes.object, + showDataPreviewModal: React.PropTypes.bool, + dataPreviewQueryId: React.PropTypes.string, +}; + +class DataPreviewModal extends React.Component { + hide() { + this.props.actions.hideDataPreview(); + } + render() { + if (this.props.showDataPreviewModal && this.props.dataPreviewQueryId) { + const query = this.props.queries[this.props.dataPreviewQueryId]; + return ( + + + + Data preview for {query.tableName} + + + + + + + ); + } + return null; + } +} +DataPreviewModal.propTypes = propTypes; + +function mapStateToProps(state) { + return { + queries: state.queries, + showDataPreviewModal: state.showDataPreviewModal, + dataPreviewQueryId: state.dataPreviewQueryId, + }; +} +function mapDispatchToProps(dispatch) { + return { + actions: bindActionCreators(Actions, dispatch), + }; +} + +export default connect(mapStateToProps, mapDispatchToProps)(DataPreviewModal); diff --git a/caravel/assets/javascripts/SqlLab/components/SqlShrink.jsx b/caravel/assets/javascripts/SqlLab/components/HighlightedSql.jsx similarity index 57% rename from caravel/assets/javascripts/SqlLab/components/SqlShrink.jsx rename to caravel/assets/javascripts/SqlLab/components/HighlightedSql.jsx index c67dfc469df42..c3db7870ad22b 100644 --- a/caravel/assets/javascripts/SqlLab/components/SqlShrink.jsx +++ b/caravel/assets/javascripts/SqlLab/components/HighlightedSql.jsx @@ -2,38 +2,43 @@ import React from 'react'; import SyntaxHighlighter from 'react-syntax-highlighter'; import { github } from 'react-syntax-highlighter/dist/styles'; -const SqlShrink = (props) => { +const HighlightedSql = (props) => { const sql = props.sql || ''; let lines = sql.split('\n'); if (lines.length >= props.maxLines) { lines = lines.slice(0, props.maxLines); lines.push('{...}'); } - const shrunk = lines.map((line) => { - if (line.length > props.maxWidth) { - return line.slice(0, props.maxWidth) + '{...}'; - } - return line; - }) - .join('\n'); + let shownSql = sql; + if (props.shrink) { + shownSql = lines.map((line) => { + if (line.length > props.maxWidth) { + return line.slice(0, props.maxWidth) + '{...}'; + } + return line; + }) + .join('\n'); + } return (
- {shrunk} + {shownSql}
); }; -SqlShrink.defaultProps = { +HighlightedSql.defaultProps = { maxWidth: 60, maxLines: 6, + shrink: false, }; -SqlShrink.propTypes = { +HighlightedSql.propTypes = { sql: React.PropTypes.string, maxWidth: React.PropTypes.number, maxLines: React.PropTypes.number, + shrink: React.PropTypes.bool, }; -export default SqlShrink; +export default HighlightedSql; diff --git a/caravel/assets/javascripts/SqlLab/components/QueryTable.jsx b/caravel/assets/javascripts/SqlLab/components/QueryTable.jsx index 3f80058be6db0..5d75a98e236e7 100644 --- a/caravel/assets/javascripts/SqlLab/components/QueryTable.jsx +++ b/caravel/assets/javascripts/SqlLab/components/QueryTable.jsx @@ -6,14 +6,30 @@ import * as Actions from '../actions'; import moment from 'moment'; import { Table } from 'reactable'; -import { ProgressBar } from 'react-bootstrap'; +import { Label, ProgressBar } from 'react-bootstrap'; import Link from './Link'; import VisualizeModal from './VisualizeModal'; -import SqlShrink from './SqlShrink'; +import ResultSet from './ResultSet'; +import ModalTrigger from '../../components/ModalTrigger'; +import HighlightedSql from './HighlightedSql'; import { STATE_BSSTYLE_MAP } from '../common'; import { fDuration } from '../../modules/dates'; import { getLink } from '../../../utils/common'; +const propTypes = { + columns: React.PropTypes.array, + actions: React.PropTypes.object, + queries: React.PropTypes.array, + onUserClicked: React.PropTypes.func, + onDbClicked: React.PropTypes.func, +}; +const defaultProps = { + columns: ['started', 'duration', 'rows'], + queries: [], + onUserClicked: () => {}, + onDbClicked: () => {}, +}; + class QueryTable extends React.Component { constructor(props) { @@ -45,6 +61,12 @@ class QueryTable extends React.Component { openQueryInNewTab(query) { this.props.actions.cloneQueryToNewTab(query); } + openAsyncResults(query) { + this.props.actions.fetchQueryResults(query); + } + clearQueryResults(query) { + this.props.actions.clearQueryResults(query); + } render() { const data = this.props.queries.map((query) => { @@ -71,9 +93,30 @@ class QueryTable extends React.Component { q.started = moment(q.startDttm).format('HH:mm:ss'); const source = (q.ctas) ? q.executedSql : q.sql; q.sql = ( - + ); - q.output = q.tempTable; + if (q.resultsKey) { + q.output = ( + + view results + + )} + modalTitle={'Data preview'} + beforeOpen={this.openAsyncResults.bind(this, query)} + onExit={this.clearQueryResults.bind(this, query)} + modalBody={} + /> + ); + } else { + q.output = q.tempTable; + } q.progress = ( +
{}, - onDbClicked: () => {}, -}; +QueryTable.propTypes = propTypes; +QueryTable.defaultProps = defaultProps; function mapStateToProps() { return {}; @@ -174,4 +206,5 @@ function mapDispatchToProps(dispatch) { actions: bindActionCreators(Actions, dispatch), }; } +export { QueryTable }; export default connect(mapStateToProps, mapDispatchToProps)(QueryTable); diff --git a/caravel/assets/javascripts/SqlLab/components/ResultSet.jsx b/caravel/assets/javascripts/SqlLab/components/ResultSet.jsx index 566193ff2f634..e8c4375b91a31 100644 --- a/caravel/assets/javascripts/SqlLab/components/ResultSet.jsx +++ b/caravel/assets/javascripts/SqlLab/components/ResultSet.jsx @@ -1,8 +1,32 @@ import React from 'react'; -import { Alert, Button, ButtonGroup } from 'react-bootstrap'; +import { Alert, Button, ButtonGroup, ProgressBar } from 'react-bootstrap'; import { Table } from 'reactable'; +import shortid from 'shortid'; + +import { connect } from 'react-redux'; +import { bindActionCreators } from 'redux'; +import * as Actions from '../actions'; import VisualizeModal from './VisualizeModal'; +import HighlightedSql from './HighlightedSql'; + +const propTypes = { + actions: React.PropTypes.object, + csv: React.PropTypes.bool, + query: React.PropTypes.object, + search: React.PropTypes.bool, + searchText: React.PropTypes.string, + showSql: React.PropTypes.bool, + visualize: React.PropTypes.bool, +}; +const defaultProps = { + search: true, + visualize: true, + showSql: false, + csv: true, + searchText: '', + actions: {}, +}; class ResultSet extends React.Component { @@ -13,83 +37,166 @@ class ResultSet extends React.Component { showModal: false, }; } - changeSearch(event) { - this.setState({ searchText: event.target.value }); - } - showModal() { - this.setState({ showModal: true }); - } - hideModal() { - this.setState({ showModal: false }); - } - render() { - const results = this.props.query.results; - let controls =
; - if (this.props.showControls) { - controls = ( + getControls() { + if (this.props.search || this.props.visualize || this.props.csv) { + let csvButton; + if (this.props.csv) { + csvButton = ( + + ); + } + let visualizeButton; + if (this.props.visualize) { + visualizeButton = ( + + ); + } + let searchBox; + if (this.props.search) { + searchBox = ( + + ); + } + return (
- - + {visualizeButton} + {csvButton}
- + {searchBox}
); } - if (results && results.data && results.data.length > 0) { + return
; + } + popSelectStar() { + const qe = { + id: shortid.generate(), + title: this.props.query.tempTable, + autorun: false, + dbId: this.props.query.dbId, + sql: `SELECT * FROM ${this.props.query.tempTable}`, + }; + this.props.actions.addQueryEditor(qe); + } + showModal() { + this.setState({ showModal: true }); + } + hideModal() { + this.setState({ showModal: false }); + } + changeSearch(event) { + this.setState({ searchText: event.target.value }); + } + fetchResults(query) { + this.props.actions.fetchQueryResults(query); + } + render() { + const query = this.props.query; + const results = query.results; + let sql; + if (this.props.showSql) { + sql = ; + } + if (['running', 'pending', 'fetching'].includes(query.state)) { + let progressBar; + if (query.progress > 0 && query.state === 'running') { + progressBar = ( + ); + } return (
- - {controls} -
- col.name)} - sortable - className="table table-condensed table-bordered" - filterBy={this.state.searchText} - filterable={results.columns} - hideFilterInput - /> - + Loading... + {progressBar} ); + } else if (query.state === 'failed') { + return {query.errorMessage}; + } else if (query.state === 'success' && query.ctas) { + return ( +
+ + Table [{query.tempTable}] was + created   + + +
); + } else if (query.state === 'success') { + if (results && results.data && results.data.length > 0) { + return ( +
+ + {this.getControls.bind(this)()} + {sql} +
+
col.name)} + sortable + className="table table-condensed table-bordered" + filterBy={this.state.searchText} + filterable={results.columns.map((c) => c.name)} + hideFilterInput + /> + + + ); + } else if (query.resultsKey) { + return ( +
+ This query was run asynchronously   + + +
+ ); + } } return (The query returned no data); } } -ResultSet.propTypes = { - query: React.PropTypes.object, - showControls: React.PropTypes.bool, - search: React.PropTypes.bool, - searchText: React.PropTypes.string, -}; -ResultSet.defaultProps = { - showControls: true, - search: true, - searchText: '', -}; +ResultSet.propTypes = propTypes; +ResultSet.defaultProps = defaultProps; -export default ResultSet; +function mapStateToProps() { + return {}; +} +function mapDispatchToProps(dispatch) { + return { + actions: bindActionCreators(Actions, dispatch), + }; +} +export default connect(mapStateToProps, mapDispatchToProps)(ResultSet); diff --git a/caravel/assets/javascripts/SqlLab/components/SouthPane.jsx b/caravel/assets/javascripts/SqlLab/components/SouthPane.jsx index 9723899d37a41..1941cc93101b0 100644 --- a/caravel/assets/javascripts/SqlLab/components/SouthPane.jsx +++ b/caravel/assets/javascripts/SqlLab/components/SouthPane.jsx @@ -1,4 +1,4 @@ -import { Alert, Button, Tab, Tabs } from 'react-bootstrap'; +import { Alert, Tab, Tabs } from 'react-bootstrap'; import QueryHistory from './QueryHistory'; import ResultSet from './ResultSet'; import React from 'react'; @@ -8,66 +8,29 @@ import { bindActionCreators } from 'redux'; import * as Actions from '../actions'; import shortid from 'shortid'; -class SouthPane extends React.Component { - popSelectStar() { - const qe = { - id: shortid.generate(), - title: this.props.latestQuery.tempTable, - autorun: false, - dbId: this.props.latestQuery.dbId, - sql: `SELECT * FROM ${this.props.latestQuery.tempTable}`, - }; - this.props.actions.addQueryEditor(qe); +const SouthPane = function (props) { + let results =
; + const latestQuery = props.latestQuery; + if (latestQuery) { + results = ; + } else { + results = Run a query to display results here; } - render() { - let results =
; - const latestQuery = this.props.latestQuery; - if (latestQuery) { - if (['running', 'pending'].includes(latestQuery.state)) { - results = ( - Loading.. - ); - } else if (latestQuery.state === 'failed') { - results = {latestQuery.errorMessage}; - } else if (latestQuery.state === 'success' && latestQuery.ctas) { - results = ( -
- - Table [{latestQuery.tempTable}] was created - -

- - -

-
); - } else if (latestQuery.state === 'success') { - results = ; - } - } else { - results = Run a query to display results here; - } - return ( -
- - -
- {results} -
-
- - - -
-
- ); - } -} + return ( +
+ + +
+ {results} +
+
+ + + +
+
+ ); +}; SouthPane.propTypes = { latestQuery: React.PropTypes.object, diff --git a/caravel/assets/javascripts/SqlLab/components/SqlEditor.jsx b/caravel/assets/javascripts/SqlLab/components/SqlEditor.jsx index 4f69b9ce253ed..a3abdc36ee5f5 100644 --- a/caravel/assets/javascripts/SqlLab/components/SqlEditor.jsx +++ b/caravel/assets/javascripts/SqlLab/components/SqlEditor.jsx @@ -1,5 +1,3 @@ -const $ = require('jquery'); -import { now } from '../../modules/dates'; import React from 'react'; import { Button, @@ -53,64 +51,16 @@ class SqlEditor extends React.Component { this.startQuery(runAsync); } startQuery(runAsync = false, ctas = false) { - const that = this; const query = { dbId: this.props.queryEditor.dbId, - id: shortid.generate(), - progress: 0, sql: this.props.queryEditor.sql, sqlEditorId: this.props.queryEditor.id, - startDttm: now(), - state: 'running', tab: this.props.queryEditor.title, - }; - if (runAsync) { - query.state = 'pending'; - } - - // Execute the Query - that.props.actions.startQuery(query); - - const sqlJsonUrl = '/caravel/sql_json/'; - const sqlJsonRequest = { - client_id: query.id, - database_id: this.props.queryEditor.dbId, - json: true, + tempTableName: this.state.ctas, runAsync, - schema: this.props.queryEditor.schema, - select_as_cta: ctas, - sql: this.props.queryEditor.sql, - sql_editor_id: this.props.queryEditor.id, - tab: this.props.queryEditor.title, - tmp_table_name: this.state.ctas, + ctas, }; - $.ajax({ - type: 'POST', - dataType: 'json', - url: sqlJsonUrl, - data: sqlJsonRequest, - success(results) { - if (!runAsync) { - that.props.actions.querySuccess(query, results); - } - }, - error(err, textStatus, errorThrown) { - let msg; - try { - msg = err.responseJSON.error; - } catch (e) { - if (err.responseText !== undefined) { - msg = err.responseText; - } - } - if (textStatus === 'error' && errorThrown === '') { - msg = 'Could not connect to server'; - } else if (msg === null) { - msg = `[${textStatus}] ${errorThrown}`; - } - that.props.actions.queryFailed(query, msg); - }, - }); + this.props.actions.runQuery(query); } stopQuery() { this.props.actions.stopQuery(this.props.latestQuery); @@ -180,7 +130,7 @@ class SqlEditor extends React.Component { {runButtons} ); - if (this.props.latestQuery && this.props.latestQuery.state === 'running') { + if (this.props.latestQuery && ['running', 'pending'].includes(this.props.latestQuery.state)) { runButtons = ( ); diff --git a/caravel/assets/javascripts/components/ModalTrigger.jsx b/caravel/assets/javascripts/components/ModalTrigger.jsx index ddac1e366b0f8..4a12e23a1f503 100644 --- a/caravel/assets/javascripts/components/ModalTrigger.jsx +++ b/caravel/assets/javascripts/components/ModalTrigger.jsx @@ -7,12 +7,18 @@ const propTypes = { modalTitle: PropTypes.node.isRequired, modalBody: PropTypes.node.isRequired, beforeOpen: PropTypes.func, + onExit: PropTypes.func, isButton: PropTypes.bool, + bsSize: PropTypes.string, + className: PropTypes.string, }; const defaultProps = { beforeOpen: () => {}, + onExit: () => {}, isButton: false, + bsSize: null, + className: '', }; export default class ModalTrigger extends React.Component { @@ -42,7 +48,13 @@ export default class ModalTrigger extends React.Component { return ( {this.props.triggerNode} - + {this.props.modalTitle} diff --git a/caravel/assets/spec/javascripts/sqllab/TableElement_spec.jsx b/caravel/assets/spec/javascripts/sqllab/TableElement_spec.jsx index fce61bb10f9d8..a1a8448730828 100644 --- a/caravel/assets/spec/javascripts/sqllab/TableElement_spec.jsx +++ b/caravel/assets/spec/javascripts/sqllab/TableElement_spec.jsx @@ -149,7 +149,7 @@ describe('TableElement', () => { }); it('has 3 Link elements', () => { const wrapper = shallow(); - expect(wrapper.find(Link)).to.have.length(3); + expect(wrapper.find(Link)).to.have.length(2); }); it('has 14 columns', () => { const wrapper = shallow(); diff --git a/caravel/config.py b/caravel/config.py index 71b754234ccee..cad17ed6fb731 100644 --- a/caravel/config.py +++ b/caravel/config.py @@ -234,6 +234,11 @@ class CeleryConfig(object): # Timeout duration for SQL Lab synchronous queries SQLLAB_TIMEOUT = 30 +# An instantiated derivative of werkzeug.contrib.cache.BaseCache +# if enabled, it can be used to store the results of long-running queries +# in SQL Lab by using the "Run Async" button/feature +RESULTS_BACKEND = None + try: from caravel_config import * # noqa except ImportError: diff --git a/caravel/db_engine_specs.py b/caravel/db_engine_specs.py index 4f24e2af5b4c1..dfb86905f366f 100644 --- a/caravel/db_engine_specs.py +++ b/caravel/db_engine_specs.py @@ -19,15 +19,23 @@ from collections import namedtuple import inspect import textwrap +import time from flask_babel import lazy_gettext as _ Grain = namedtuple('Grain', 'name label function') +class LimitMethod(object): + """Enum the ways that limits can be applied""" + FETCH_MANY = 'fetch_many' + WRAP_SQL = 'wrap_sql' + + class BaseEngineSpec(object): engine = 'base' # str as defined in sqlalchemy.engine.engine time_grains = tuple() + limit_method = LimitMethod.FETCH_MANY @classmethod def epoch_to_dttm(cls): @@ -46,6 +54,15 @@ def extra_table_metadata(cls, database, table_name, schema_name): def convert_dttm(cls, target_type, dttm): return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S')) + @classmethod + def handle_cursor(cls, cursor, query): + """Handle a live cursor between the execute and fetchall calls + + The flow works without this method doing anything, but it allows + for handling the cursor and updating progress information in the + query object""" + pass + class PostgresEngineSpec(BaseEngineSpec): engine = 'postgres' @@ -201,6 +218,28 @@ def extra_table_metadata(cls, database, table_name, schema_name): } } + @classmethod + def handle_cursor(cls, cursor, query, session): + """Updates progress information""" + polled = cursor.poll() + # poll returns dict -- JSON status information or ``None`` + # if the query is done + # https://github.com/dropbox/PyHive/blob/ + # b34bdbf51378b3979eaf5eca9e956f06ddc36ca0/pyhive/presto.py#L178 + while polled: + # Update the object and wait for the kill signal. + stats = polled.get('stats', {}) + if stats: + completed_splits = float(stats.get('completedSplits')) + total_splits = float(stats.get('totalSplits')) + if total_splits and completed_splits: + progress = 100 * (completed_splits / total_splits) + if progress > query.progress: + query.progress = progress + session.commit() + time.sleep(1) + polled = cursor.poll() + class MssqlEngineSpec(BaseEngineSpec): engine = 'mssql' diff --git a/caravel/migrations/versions/7e3ddad2a00b_results_key_to_query.py b/caravel/migrations/versions/7e3ddad2a00b_results_key_to_query.py new file mode 100644 index 0000000000000..f2a46085631da --- /dev/null +++ b/caravel/migrations/versions/7e3ddad2a00b_results_key_to_query.py @@ -0,0 +1,22 @@ +"""results_key to query + +Revision ID: 7e3ddad2a00b +Revises: b46fa1b0b39e +Create Date: 2016-10-14 11:17:54.995156 + +""" + +# revision identifiers, used by Alembic. +revision = '7e3ddad2a00b' +down_revision = 'b46fa1b0b39e' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('query', sa.Column('results_key', sa.String(length=64), nullable=True)) + + +def downgrade(): + op.drop_column('query', 'results_key') diff --git a/caravel/models.py b/caravel/models.py index fdb2c3ae6f815..1d53de828f807 100644 --- a/caravel/models.py +++ b/caravel/models.py @@ -2272,6 +2272,8 @@ class Query(Model): # # of rows in the result set or rows modified. rows = Column(Integer) error_message = Column(Text) + # key used to store the results in the results backend + results_key = Column(String(64)) # Using Numeric in place of DateTime for sub-second precision # stored as seconds since epoch, allowing for milliseconds @@ -2320,6 +2322,7 @@ def to_dict(self): 'userId': self.user_id, 'user': self.user.username, 'limit_reached': self.limit_reached, + 'resultsKey': self.results_key, } @property diff --git a/caravel/results_backends.py b/caravel/results_backends.py new file mode 100644 index 0000000000000..714ed66b15c6f --- /dev/null +++ b/caravel/results_backends.py @@ -0,0 +1,49 @@ +"""Results backends are used to store long-running query results + +The Abstraction is flask-cache, which uses the BaseCache class from werkzeug +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from werkzeug.contrib.cache import BaseCache + + +class S3Cache(BaseCache): + + """S3 cache""" + + def __init__(self, default_timeout=300): + self.default_timeout = default_timeout + + def get(self, key): + return None + + def delete(self, key): + return True + + def set(self, key, value, timeout=None): + return True + + def add(self, key, value, timeout=None): + """Works like :meth:`set` but does not overwrite the values of already + existing keys. + :param key: the key to set + :param value: the value for the key + :param timeout: the cache timeout for the key in seconds (if not + specified, it uses the default timeout). A timeout of + 0 idicates that the cache never expires. + :returns: Same as :meth:`set`, but also ``False`` for already + existing keys. + :rtype: boolean + """ + return True + + def clear(self): + """Clears the cache. Keep in mind that not all caches support + completely clearing the cache. + :returns: Whether the cache has been cleared. + :rtype: boolean + """ + return True diff --git a/caravel/sql_lab.py b/caravel/sql_lab.py index 97fa11ac20203..23f3eb09a1060 100644 --- a/caravel/sql_lab.py +++ b/caravel/sql_lab.py @@ -2,20 +2,22 @@ from datetime import datetime import pandas as pd import logging -import numpy -import time - -from caravel import app, db, models, utils, dataframe +import json +import uuid +import zlib +from caravel import ( + app, db, models, utils, dataframe, results_backend) +from caravel.db_engine_specs import LimitMethod QueryStatus = models.QueryStatus - celery_app = celery.Celery(config_source=app.config.get('CELERY_CONFIG')) def is_query_select(sql): return sql.upper().startswith('SELECT') + def create_table_as(sql, table_name, schema=None, override=False): """Reformats the query into the create table as query. @@ -44,14 +46,14 @@ def create_table_as(sql, table_name, schema=None, override=False): @celery_app.task -def get_sql_results(query_id, return_results=True): +def get_sql_results(query_id, return_results=True, store_results=False): """Executes the sql query returns the results.""" session = db.session() session.commit() # HACK query = session.query(models.Query).filter_by(id=query_id).one() database = query.database executed_sql = query.sql.strip().strip(';') - + db_engine_spec = database.db_engine_spec def handle_error(msg): """Local method handling error while processing the SQL""" @@ -79,7 +81,9 @@ def handle_error(msg): executed_sql = create_table_as( executed_sql, query.tmp_table_name, database.force_ctas_schema) query.select_as_cta_used = True - elif query.limit and is_select: + elif ( + query.limit and is_select and + db_engine_spec.limit_method == LimitMethod.WRAP_SQL): executed_sql = database.wrap_sql_limit(executed_sql, query.limit) query.limit_used = True engine = database.get_sqla_engine(schema=query.schema) @@ -94,36 +98,17 @@ def handle_error(msg): cursor = result_proxy.cursor query.status = QueryStatus.RUNNING session.flush() - if database.backend == 'presto': - polled = cursor.poll() - # poll returns dict -- JSON status information or ``None`` - # if the query is done - # https://github.com/dropbox/PyHive/blob/ - # b34bdbf51378b3979eaf5eca9e956f06ddc36ca0/pyhive/presto.py#L178 - while polled: - # Update the object and wait for the kill signal. - stats = polled.get('stats', {}) - if stats: - completed_splits = float(stats.get('completedSplits')) - total_splits = float(stats.get('totalSplits')) - if total_splits and completed_splits: - progress = 100 * (completed_splits / total_splits) - if progress > query.progress: - query.progress = progress - session.commit() - time.sleep(1) - polled = cursor.poll() + db_engine_spec.handle_cursor(cursor, query) cdf = None if result_proxy.cursor: column_names = [col[0] for col in result_proxy.cursor.description] - data = result_proxy.fetchall() + if db_engine_spec.limit_method == LimitMethod.FETCH_MANY: + data = result_proxy.fetchmany(query.limit) + else: + data = result_proxy.fetchall() cdf = dataframe.CaravelDataFrame( pd.DataFrame(data, columns=column_names)) - # TODO consider generating tuples instead of dicts to send - # less data through the wire. The command bellow does that, - # but we'd need to align on the client side. - # data = df.values.tolist() query.rows = result_proxy.rowcount query.progress = 100 @@ -135,23 +120,26 @@ def handle_error(msg): query.select_sql = '{}'.format(database.select_star( query.tmp_table_name, limit=query.limit)) query.end_time = utils.now_as_float() + session.flush() + + payload = { + 'query_id': query.id, + 'status': query.status, + 'data': [], + } + payload['data'] = cdf.data if cdf else [] + payload['columns'] = cdf.columns_dict if cdf else [] + payload['query'] = query.to_dict() + payload = json.dumps(payload, default=utils.json_iso_dttm_ser) + + if store_results and results_backend: + key = '{}'.format(uuid.uuid4()) + logging.info("Storing results in results backend, key: {}".format(key)) + results_backend.set(key, zlib.compress(payload)) + query.results_key = key + + session.flush() session.commit() if return_results: - payload = { - 'query_id': query.id, - 'status': query.status, - 'data': [], - } - if query.status == models.QueryStatus.SUCCESS: - payload['data'] = cdf.data if cdf else [] - payload['columns'] = cdf.columns_dict if cdf else [] - else: - payload['error'] = query.error_message return payload - ''' - # Hack testing using a kv store for results - key = "query_id={}".format(query.id) - logging.info("Storing results in key=[{}]".format(key)) - cache.set(key, json.dumps(payload, default=utils.json_iso_dttm_ser)) - ''' diff --git a/caravel/utils.py b/caravel/utils.py index f685ec5017d9d..91ac4943d5999 100644 --- a/caravel/utils.py +++ b/caravel/utils.py @@ -15,6 +15,8 @@ import signal import uuid +from sqlalchemy import event, exc +from sqlalchemy.pool import Pool import parsedatetime import sqlalchemy as sa from dateutil.parser import parse @@ -505,3 +507,18 @@ def wrap_clause_in_parens(sql): if sql.strip(): sql = '({})'.format(sql) return sa.text(sql) + + +def pessimistic_connection_handling(target): + @event.listens_for(target, "checkout") + def ping_connection(dbapi_connection, connection_record, connection_proxy): + """ + Disconnect Handling - Pessimistic, taken from: + http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html + """ + cursor = dbapi_connection.cursor() + try: + cursor.execute("SELECT 1") + except: + raise exc.DisconnectionError() + cursor.close() diff --git a/caravel/views.py b/caravel/views.py index 5ab1dd4ef0ee6..4dfb4e3818746 100755 --- a/caravel/views.py +++ b/caravel/views.py @@ -5,20 +5,19 @@ import json import logging -import os import pickle import re import sys import time import traceback +import zlib from datetime import datetime, timedelta import functools import sqlalchemy as sqla from flask import ( - g, request, make_response, redirect, flash, Response, render_template, - Markup, url_for) + g, request, redirect, flash, Response, render_template, Markup) from flask_appbuilder import ModelView, CompactCRUDMixin, BaseView, expose from flask_appbuilder.actions import action from flask_appbuilder.models.sqla.interface import SQLAInterface @@ -29,7 +28,6 @@ from flask_appbuilder.models.sqla.filters import BaseFilter from sqlalchemy import create_engine -from werkzeug import secure_filename from werkzeug.datastructures import ImmutableMultiDict from werkzeug.routing import BaseConverter from wtforms.validators import ValidationError @@ -37,7 +35,7 @@ import caravel from caravel import ( appbuilder, cache, db, models, viz, utils, app, - sm, ascii_art, sql_lab + sm, ascii_art, sql_lab, results_backend ) from caravel.source_registry import SourceRegistry from caravel.models import DatasourceAccessRequest as DAR @@ -78,6 +76,7 @@ class ListWidgetWithCheckboxes(ListWidget): ACCESS_REQUEST_MISSING_ERR = __( "The access requests seem to have been deleted") USER_MISSING_ERR = __("The user seems to have been deleted") +DATASOURCE_ACCESS_ERR = __("You don't have access to this datasource") def get_database_access_error_msg(database_name): @@ -661,6 +660,8 @@ def post_update(self, table): category_label=__("Sources"), icon='fa-table',) +appbuilder.add_separator("Sources") + class AccessRequestsModelView(CaravelModelView, DeleteMixin): datamodel = SQLAInterface(DAR) @@ -1248,7 +1249,7 @@ def explore_json(self, datasource_type, datasource_id): if not self.datasource_access(viz_obj.datasource): return Response( json.dumps( - {'error': _("You don't have access to this datasource")}), + {'error': DATASOURCE_ACCESS_ERR}), status=404, mimetype="application/json") @@ -2010,6 +2011,38 @@ def cached_key(self, key): return resp return "nope" + @has_access_api + @expose("/results//") + @log_this + def results(self, key): + """Serves a key off of the results backend""" + blob = results_backend.get(key) + if blob: + json_payload = zlib.decompress(blob) + obj = json.loads(json_payload) + db_id = obj['query']['dbId'] + session = db.session() + mydb = session.query(models.Database).filter_by(id=db_id).one() + + if not self.database_access(mydb): + json_error_response( + get_database_access_error_msg(mydb.database_name)) + + return Response( + json_payload, + status=200, + mimetype="application/json") + else: + return Response( + json.dumps({ + 'error': ( + "Data could not be retrived. You may want to " + "re-run the query." + ) + }), + status=410, + mimetype="application/json") + @has_access_api @expose("/sql_json/", methods=['POST', 'GET']) @log_this @@ -2052,7 +2085,8 @@ def sql_json(self): # Async request. if async: # Ignore the celery future object and the request may time out. - sql_lab.get_sql_results.delay(query_id, return_results=False) + sql_lab.get_sql_results.delay( + query_id, return_results=False, store_results=not query.select_as_cta) return Response( json.dumps({'query': query.to_dict()}, default=utils.json_int_dttm_ser, @@ -2077,9 +2111,8 @@ def sql_json(self): json.dumps({'error': "{}".format(e)}), status=500, mimetype="application/json") - data['query'] = query.to_dict() return Response( - json.dumps(data, default=utils.json_iso_dttm_ser), + data, status=200, mimetype="application/json")