From c0c30751f12a9515d3276fefe4738feb60adda99 Mon Sep 17 00:00:00 2001 From: Allen Short Date: Mon, 25 Mar 2019 22:02:43 -0500 Subject: [PATCH] Aggregate query results (re #35) (#339) --- .../app/components/queries/ScheduleDialog.jsx | 21 ++++- client/app/pages/queries/view.js | 5 +- client/app/services/query-result.js | 11 +++ client/app/services/query.js | 6 +- migrations/versions/2ba47e9812b1_.py | 24 ++++++ migrations/versions/9d7678c47452_.py | 34 ++++++++ redash/handlers/api.py | 2 + redash/handlers/queries.py | 3 + redash/handlers/query_results.py | 27 +++++++ redash/models/__init__.py | 46 +++++++++++ redash/serializers/__init__.py | 1 + redash/tasks/queries.py | 1 + tests/factories.py | 12 ++- tests/handlers/test_queries.py | 80 +++++++++++++++++++ tests/test_models.py | 59 +++++++++++++- 15 files changed, 323 insertions(+), 9 deletions(-) create mode 100644 migrations/versions/2ba47e9812b1_.py create mode 100644 migrations/versions/9d7678c47452_.py diff --git a/client/app/components/queries/ScheduleDialog.jsx b/client/app/components/queries/ScheduleDialog.jsx index 84245f481f..08b4a6ae4c 100644 --- a/client/app/components/queries/ScheduleDialog.jsx +++ b/client/app/components/queries/ScheduleDialog.jsx @@ -3,6 +3,7 @@ import PropTypes from 'prop-types'; import Modal from 'antd/lib/modal'; import DatePicker from 'antd/lib/date-picker'; import TimePicker from 'antd/lib/time-picker'; +import InputNumber from 'antd/lib/input-number'; import Select from 'antd/lib/select'; import Radio from 'antd/lib/radio'; import { capitalize, clone, isEqual, omitBy, isNil } from 'lodash'; @@ -60,10 +61,12 @@ class ScheduleDialog extends React.Component { schedule: RefreshScheduleType, refreshOptions: PropTypes.arrayOf(PropTypes.number).isRequired, dialog: DialogPropType.isRequired, + resultsetSize: PropTypes.number, }; static defaultProps = { schedule: RefreshScheduleDefault, + resultsetSize: 1, }; state = this.getState(); @@ -81,6 +84,7 @@ class ScheduleDialog extends React.Component { interval, dayOfWeek: day ? WEEKDAYS_SHORT[WEEKDAYS_FULL.indexOf(day)] : null, newSchedule, + resultsetSize: this.props.resultsetSize, }; } @@ -175,6 +179,10 @@ class ScheduleDialog extends React.Component { this.setScheduleUntil(null, date); }; + setResultsetSize = (resultsetSize) => { + this.setState({ resultsetSize }); + } + save() { const { newSchedule } = this.state; const hasChanged = () => { @@ -186,9 +194,9 @@ class ScheduleDialog extends React.Component { // save if changed if (hasChanged()) { if (newSchedule.interval) { - this.props.dialog.close(clone(newSchedule)); + this.props.dialog.close([clone(newSchedule), this.state.resultsetSize]); } else { - this.props.dialog.close(null); + this.props.dialog.close([null, this.state.resultsetSize]); } } this.props.dialog.dismiss(); @@ -202,6 +210,7 @@ class ScheduleDialog extends React.Component { hour, seconds, newSchedule: { until }, + resultsetSize, } = this.state; return ( @@ -271,6 +280,14 @@ class ScheduleDialog extends React.Component { ) : null} + Number of query results to keep + + ); } diff --git a/client/app/pages/queries/view.js b/client/app/pages/queries/view.js index 70fd36d955..1512f80d97 100644 --- a/client/app/pages/queries/view.js +++ b/client/app/pages/queries/view.js @@ -234,6 +234,7 @@ function QueryViewCtrl( } else { request = pick($scope.query, [ 'schedule', + 'schedule_resultset_size', 'query', 'id', 'description', @@ -503,8 +504,10 @@ function QueryViewCtrl( ScheduleDialog.showModal({ schedule: $scope.query.schedule, refreshOptions: $scope.refreshOptions, - }).result.then((schedule) => { + resultsetSize: $scope.query.schedule_resultset_size, + }).result.then(([schedule, resultsetSize]) => { $scope.query.schedule = schedule; + $scope.query.schedule_resultset_size = resultsetSize; $scope.saveQuery(); }); }; diff --git a/client/app/services/query-result.js b/client/app/services/query-result.js index 6ec8f68601..ded4fdf558 100644 --- a/client/app/services/query-result.js +++ b/client/app/services/query-result.js @@ -38,6 +38,7 @@ function getColumnFriendlyName(column) { function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) { const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } }); const QueryResultByQueryIdResource = $resource('api/queries/:queryId/results/:id.json', { queryId: '@queryId', id: '@id' }); + const QueryResultSetResource = $resource('api/queries/:id/resultset', { id: '@id' }); const Job = $resource('api/jobs/:id', { id: '@id' }); const JobWithApiKey = $resource('api/queries/:queryId/jobs/:id', { queryId: '@queryId', id: '@id' }); const statuses = { @@ -296,6 +297,16 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) { return queryResult; } + static getResultSet(queryId) { + const queryResult = new QueryResult(); + + QueryResultSetResource.get({ id: queryId }, (response) => { + queryResult.update(response); + }); + + return queryResult; + } + loadLatestCachedResult(queryId, parameters) { $resource('api/queries/:id/results', { id: '@queryId' }, { post: { method: 'POST' } }) .post({ queryId, parameters }, diff --git a/client/app/services/query.js b/client/app/services/query.js index c86ae7ffeb..11c1ae87a9 100644 --- a/client/app/services/query.js +++ b/client/app/services/query.js @@ -722,7 +722,11 @@ function QueryResource( this.latest_query_data_id = null; } - if (this.latest_query_data && maxAge !== 0) { + if (this.schedule_resultset_size > 1) { + if (!this.queryResult) { + this.queryResult = QueryResult.getResultSet(this.id); + } + } else if (this.latest_query_data && maxAge !== 0) { if (!this.queryResult) { this.queryResult = new QueryResult({ query_result: this.latest_query_data, diff --git a/migrations/versions/2ba47e9812b1_.py b/migrations/versions/2ba47e9812b1_.py new file mode 100644 index 0000000000..93d0f59268 --- /dev/null +++ b/migrations/versions/2ba47e9812b1_.py @@ -0,0 +1,24 @@ +"""empty message + +Revision ID: 2ba47e9812b1 +Revises: 71477dadd6ef, 9d7678c47452 +Create Date: 2018-07-25 16:09:54.769289 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '2ba47e9812b1' +down_revision = ('71477dadd6ef', '9d7678c47452', ) +branch_labels = None +depends_on = None + + +def upgrade(): + pass + + +def downgrade(): + pass diff --git a/migrations/versions/9d7678c47452_.py b/migrations/versions/9d7678c47452_.py new file mode 100644 index 0000000000..d351153c87 --- /dev/null +++ b/migrations/versions/9d7678c47452_.py @@ -0,0 +1,34 @@ +"""Incremental query results aggregation + +Revision ID: 9d7678c47452 +Revises: 15041b7085fe +Create Date: 2018-03-08 04:36:12.802199 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '9d7678c47452' +down_revision = '15041b7085fe' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table('query_resultsets', + sa.Column('query_id', sa.Integer(), nullable=False), + sa.Column('result_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['query_id'], ['queries.id'], ), + sa.ForeignKeyConstraint(['result_id'], ['query_results.id'], ), + sa.PrimaryKeyConstraint('query_id', 'result_id') + ) + op.add_column(u'queries', sa.Column('schedule_resultset_size', sa.Integer(), nullable=True)) +1 + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column(u'queries', 'schedule_resultset_size') + op.drop_table('query_resultsets') + # ### end Alembic commands ### diff --git a/redash/handlers/api.py b/redash/handlers/api.py index 90ab2172e6..ee21a92a6e 100644 --- a/redash/handlers/api.py +++ b/redash/handlers/api.py @@ -43,6 +43,7 @@ QueryResultDropdownResource, QueryDropdownsResource, QueryResultListResource, + QueryResultSetResource, QueryResultResource) from redash.handlers.query_snippets import (QuerySnippetListResource, QuerySnippetResource) @@ -121,6 +122,7 @@ def json_representation(data, code, headers=None): api.add_org_resource(QueryRegenerateApiKeyResource, '/api/queries//regenerate_api_key', endpoint='query_regenerate_api_key') +api.add_org_resource(QueryResultSetResource, '/api/queries//resultset', endpoint='query_aggregate_results') api.add_org_resource(QueryVersionListResource, '/api/queries//version', endpoint='query_versions') api.add_org_resource(ChangeResource, '/api/changes/', endpoint='changes') diff --git a/redash/handlers/queries.py b/redash/handlers/queries.py index d35a70fb88..bdaba408ca 100644 --- a/redash/handlers/queries.py +++ b/redash/handlers/queries.py @@ -198,6 +198,7 @@ def post(self): : 0: + q.query_results.append(query_result) query_ids = [q.id for q in queries] logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash) @@ -375,6 +378,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model): data_source = db.relationship(DataSource, backref='queries') latest_query_data_id = Column(db.Integer, db.ForeignKey("query_results.id"), nullable=True) latest_query_data = db.relationship(QueryResult) + query_results = db.relationship("QueryResult", secondary="query_resultsets") name = Column(db.String(255)) description = Column(db.String(4096), nullable=True) query_text = Column("query", db.Text) @@ -389,6 +393,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model): is_draft = Column(db.Boolean, default=True, index=True) schedule = Column(MutableDict.as_mutable(PseudoJSON), nullable=True) schedule_failures = Column(db.Integer, default=0) + schedule_resultset_size = Column(db.Integer, nullable=True) visualizations = db.relationship("Visualization", cascade="all, delete-orphan") options = Column(MutableDict.as_mutable(PseudoJSON), default={}) search_vector = Column(TSVectorType('id', 'name', 'description', 'query', @@ -599,6 +604,37 @@ def search(cls, term, group_ids, user_id=None, include_drafts=False, # sort the result using the weight as defined in the search vector column return all_queries.search(term, sort=True).limit(limit) + @classmethod + def delete_stale_resultsets(cls): + delete_count = 0 + texts = [c[0] for c in db.session.query(Query.query_text) + .filter(Query.schedule_resultset_size != None).distinct()] + for text in texts: + queries = (Query.query.filter(Query.query_text == text, + Query.schedule_resultset_size != None) + .order_by(Query.schedule_resultset_size.desc())) + # Multiple queries with the same text may request multiple result sets + # be kept. We start with the one that keeps the most, and delete both + # the unneeded bridge rows and result sets. + first_query = queries.first() + if first_query is not None and first_query.schedule_resultset_size: + resultsets = QueryResultSet.query.filter(QueryResultSet.query_rel == first_query).order_by(QueryResultSet.result_id) + resultset_count = resultsets.count() + if resultset_count > first_query.schedule_resultset_size: + n_to_delete = resultset_count - first_query.schedule_resultset_size + r_ids = [r.result_id for r in resultsets][:n_to_delete] + QueryResultSet.query.filter(QueryResultSet.result_id.in_(r_ids)).delete(synchronize_session=False) + delete_count += QueryResult.query.filter(QueryResult.id.in_(r_ids)).delete(synchronize_session=False) + # By this point there are no stale result sets left. + # Delete unneeded bridge rows for the remaining queries. + for q in queries[1:]: + resultsets = db.session.query(QueryResultSet.result_id).filter(QueryResultSet.query_rel == q).order_by(QueryResultSet.result_id) + n_to_delete = resultsets.count() - q.schedule_resultset_size + if n_to_delete > 0: + stale_r = QueryResultSet.query.filter(QueryResultSet.result_id.in_(resultsets.limit(n_to_delete).subquery())) + stale_r.delete(synchronize_session=False) + return delete_count + @classmethod def search_by_user(cls, term, user, limit=None): return cls.by_user(user).search(term, sort=True).limit(limit) @@ -706,6 +742,16 @@ def dashboard_api_keys(self): return [api_key[0] for api_key in api_keys] +class QueryResultSet(db.Model): + query_id = Column(db.Integer, db.ForeignKey("queries.id"), + primary_key=True) + query_rel = db.relationship(Query) + result_id = Column(db.Integer, db.ForeignKey("query_results.id"), + primary_key=True) + result = db.relationship(QueryResult) + __tablename__ = 'query_resultsets' + + @listens_for(Query.query_text, 'set') def gen_query_hash(target, val, oldval, initiator): target.query_hash = utils.gen_query_hash(val) diff --git a/redash/serializers/__init__.py b/redash/serializers/__init__.py index 0dc8b6a9f9..76fb954460 100644 --- a/redash/serializers/__init__.py +++ b/redash/serializers/__init__.py @@ -93,6 +93,7 @@ def serialize_query(query, with_stats=False, with_visualizations=False, with_use 'query': query.query_text, 'query_hash': query.query_hash, 'schedule': query.schedule, + 'schedule_resultset_size': query.schedule_resultset_size, 'api_key': query.api_key, 'is_archived': query.is_archived, 'is_draft': query.is_draft, diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index 0ee9cd0ab0..d99700f0d4 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -248,6 +248,7 @@ def cleanup_query_results(): deleted_count = models.QueryResult.query.filter( models.QueryResult.id.in_(unused_query_results.subquery()) ).delete(synchronize_session=False) + deleted_count += models.Query.delete_stale_resultsets() models.db.session.commit() logger.info("Deleted %d unused query results.", deleted_count) diff --git a/tests/factories.py b/tests/factories.py index 2c82e186da..190418d1f0 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -111,7 +111,9 @@ def __call__(self): query_hash=gen_query_hash('SELECT 1'), data_source=data_source_factory.create, org_id=1) - +query_resultset_factory = ModelFactory(redash.models.QueryResultSet, + query_rel=query_factory.create, + result=query_result_factory.create) visualization_factory = ModelFactory(redash.models.Visualization, type='CHART', query_rel=query_factory.create, @@ -267,7 +269,10 @@ def create_query(self, **kwargs): 'org': self.org } args.update(kwargs) - return query_factory.create(**args) + query = query_factory.create(**args) + if 'schedule_failures' in args: + query.schedule_failures = args['schedule_failures'] + return query def create_query_with_params(self, **kwargs): args = { @@ -297,6 +302,9 @@ def create_query_result(self, **kwargs): return query_result_factory.create(**args) + def create_query_resultset(self, **kwargs): + return query_resultset_factory.create(**kwargs) + def create_visualization(self, **kwargs): args = { 'query_rel': self.create_query() diff --git a/tests/handlers/test_queries.py b/tests/handlers/test_queries.py index d55ee3a86d..09f94d9ba1 100644 --- a/tests/handlers/test_queries.py +++ b/tests/handlers/test_queries.py @@ -1,3 +1,5 @@ +import json + from tests import BaseTestCase from redash import models from redash.models import db @@ -481,3 +483,81 @@ def test_get(self): rv2 = self.make_request('get', '/api/changes/' + str(ch2.id)) self.assertEqual(rv2.status_code, 200) self.assertEqual(rv2.json['change']['name']['current'], 'version B') + + +class AggregateResultsTests(BaseTestCase): + def test_aggregate(self): + qtxt = "SELECT x FROM mytable;" + q = self.factory.create_query(query_text=qtxt, schedule_resultset_size=3) + qr0 = self.factory.create_query_result( + query_text=qtxt, + data=json.dumps({'columns': ['name', 'color'], + 'rows': [{'name': 'eve', 'color': 'grue'}, + {'name': 'mallory', 'color': 'bleen'}]})) + qr1 = self.factory.create_query_result( + query_text=qtxt, + data=json.dumps({'columns': ['name', 'color'], + 'rows': [{'name': 'bob', 'color': 'green'}, + {'name': 'fred', 'color': 'blue'}]})) + qr2 = self.factory.create_query_result( + query_text=qtxt, + data=json.dumps({'columns': ['name', 'color'], + 'rows': [{'name': 'alice', 'color': 'red'}, + {'name': 'eddie', 'color': 'orange'}]})) + qr3 = self.factory.create_query_result( + query_text=qtxt, + data=json.dumps({'columns': ['name', 'color'], + 'rows': [{'name': 'dave', 'color': 'yellow'}, + {'name': 'carol', 'color': 'taupe'}]})) + for qr in (qr0, qr1, qr2, qr3): + self.factory.create_query_resultset(query_rel=q, result=qr) + rv = self.make_request('get', '/api/queries/{}/resultset'.format(q.id)) + self.assertEqual(rv.status_code, 200) + self.assertEqual(rv.json['query_result']['data'], + {'columns': ['name', 'color'], + 'rows': [ + {'name': 'bob', 'color': 'green'}, + {'name': 'fred', 'color': 'blue'}, + {'name': 'alice', 'color': 'red'}, + {'name': 'eddie', 'color': 'orange'}, + {'name': 'dave', 'color': 'yellow'}, + {'name': 'carol', 'color': 'taupe'} + ]}) + + def test_underfilled_aggregate(self): + qtxt = "SELECT x FROM mytable;" + q = self.factory.create_query(query_text=qtxt, + schedule_resultset_size=3) + qr1 = self.factory.create_query_result( + query_text=qtxt, + data=json.dumps({'columns': ['name', 'color'], + 'rows': [{'name': 'bob', 'color': 'green'}, + {'name': 'fred', 'color': 'blue'}]})) + qr2 = self.factory.create_query_result( + query_text=qtxt, + data=json.dumps({'columns': ['name', 'color'], + 'rows': [{'name': 'alice', 'color': 'red'}, + {'name': 'eddie', 'color': 'orange'}]})) + for qr in (qr1, qr2): + self.factory.create_query_resultset(query_rel=q, result=qr) + rv = self.make_request('get', '/api/queries/{}/resultset'.format(q.id)) + self.assertEqual(rv.status_code, 200) + self.assertEqual(rv.json['query_result']['data'], + {'columns': ['name', 'color'], + 'rows': [ + {'name': 'bob', 'color': 'green'}, + {'name': 'fred', 'color': 'blue'}, + {'name': 'alice', 'color': 'red'}, + {'name': 'eddie', 'color': 'orange'} + ]}) + + def test_no_aggregate(self): + qtxt = "SELECT x FROM mytable;" + q = self.factory.create_query(query_text=qtxt) + self.factory.create_query_result( + query_text=qtxt, + data=json.dumps({'columns': ['name', 'color'], + 'rows': [{'name': 'eve', 'color': 'grue'}, + {'name': 'mallory', 'color': 'bleen'}]})) + rv = self.make_request('get', '/api/queries/{}/resultset'.format(q.id)) + self.assertEqual(rv.status_code, 404) diff --git a/tests/test_models.py b/tests/test_models.py index 3e222e0d66..47107761d5 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -341,7 +341,8 @@ def test_deletes_alerts(self): class TestUnusedQueryResults(BaseTestCase): def test_returns_only_unused_query_results(self): two_weeks_ago = utcnow() - datetime.timedelta(days=14) - qr = self.factory.create_query_result() + qt = "SELECT 1" + qr = self.factory.create_query_result(query_text=qt, retrieved_at=two_weeks_ago) self.factory.create_query(latest_query_data=qr) db.session.flush() unused_qr = self.factory.create_query_result(retrieved_at=two_weeks_ago) @@ -350,13 +351,65 @@ def test_returns_only_unused_query_results(self): def test_returns_only_over_a_week_old_results(self): two_weeks_ago = utcnow() - datetime.timedelta(days=14) - unused_qr = self.factory.create_query_result(retrieved_at=two_weeks_ago) + qt = "SELECT 1" + unused_qr = self.factory.create_query_result(query_text=qt, retrieved_at=two_weeks_ago) db.session.flush() - new_unused_qr = self.factory.create_query_result() + new_unused_qr = self.factory.create_query_result(query_text=qt) self.assertIn(unused_qr, list(models.QueryResult.unused())) self.assertNotIn(new_unused_qr, list(models.QueryResult.unused())) + def test_doesnt_return_live_incremental_results(self): + two_weeks_ago = utcnow() - datetime.timedelta(days=14) + qt = "SELECT 1" + qrs = [self.factory.create_query_result(query_text=qt, retrieved_at=two_weeks_ago) + for _ in range(5)] + q = self.factory.create_query(query_text=qt, latest_query_data=qrs[0], + schedule_resultset_size=3) + for qr in qrs: + self.factory.create_query_resultset(query_rel=q, result=qr) + db.session.flush() + self.assertEqual([], list(models.QueryResult.unused())) + + def test_deletes_stale_resultsets(self): + qt = "SELECT 17" + query = self.factory.create_query(query_text=qt, + schedule_resultset_size=5) + for _ in range(10): + r = self.factory.create_query_result(query_text=qt) + self.factory.create_query_resultset(query_rel=query, result=r) + qt2 = "SELECT 100" + query2 = self.factory.create_query(query_text=qt2, schedule_resultset_size=5) + for _ in range(10): + r = self.factory.create_query_result(query_text=qt2) + self.factory.create_query_resultset(query_rel=query2, result=r) + db.session.flush() + self.assertEqual(models.QueryResultSet.query.count(), 20) + self.assertEqual(models.Query.delete_stale_resultsets(), 10) + self.assertEqual(models.QueryResultSet.query.count(), 10) + + def test_deletes_stale_resultsets_with_dupe_queries(self): + qt = "SELECT 17" + query = self.factory.create_query(query_text=qt, + schedule_resultset_size=5) + for _ in range(10): + r = self.factory.create_query_result(query_text=qt) + self.factory.create_query_resultset(query_rel=query, result=r) + query2 = self.factory.create_query(query_text=qt, + schedule_resultset_size=3) + for _ in range(10): + self.factory.create_query_result(query_text=qt) + self.factory.create_query_resultset(query_rel=query2) + qt2 = "SELECT 100" + query3 = self.factory.create_query(query_text=qt2, schedule_resultset_size=5) + for _ in range(10): + r = self.factory.create_query_result(query_text=qt2) + self.factory.create_query_resultset(query_rel=query3, result=r) + db.session.flush() + self.assertEqual(models.QueryResultSet.query.count(), 30) + self.assertEqual(models.Query.delete_stale_resultsets(), 10) + self.assertEqual(models.QueryResultSet.query.count(), 13) + class TestQueryAll(BaseTestCase): def test_returns_only_queries_in_given_groups(self):