Skip to content

Commit

Permalink
Aggregate query results (re #35) (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
Allen Short committed Sep 16, 2019
1 parent 2362a07 commit c0c3075
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 9 deletions.
21 changes: 19 additions & 2 deletions client/app/components/queries/ScheduleDialog.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import PropTypes from 'prop-types';
import Modal from 'antd/lib/modal';
import DatePicker from 'antd/lib/date-picker';
import TimePicker from 'antd/lib/time-picker';
import InputNumber from 'antd/lib/input-number';
import Select from 'antd/lib/select';
import Radio from 'antd/lib/radio';
import { capitalize, clone, isEqual, omitBy, isNil } from 'lodash';
Expand Down Expand Up @@ -60,10 +61,12 @@ class ScheduleDialog extends React.Component {
schedule: RefreshScheduleType,
refreshOptions: PropTypes.arrayOf(PropTypes.number).isRequired,
dialog: DialogPropType.isRequired,
resultsetSize: PropTypes.number,
};

static defaultProps = {
schedule: RefreshScheduleDefault,
resultsetSize: 1,
};

state = this.getState();
Expand All @@ -81,6 +84,7 @@ class ScheduleDialog extends React.Component {
interval,
dayOfWeek: day ? WEEKDAYS_SHORT[WEEKDAYS_FULL.indexOf(day)] : null,
newSchedule,
resultsetSize: this.props.resultsetSize,
};
}

Expand Down Expand Up @@ -175,6 +179,10 @@ class ScheduleDialog extends React.Component {
this.setScheduleUntil(null, date);
};

setResultsetSize = (resultsetSize) => {
this.setState({ resultsetSize });
}

save() {
const { newSchedule } = this.state;
const hasChanged = () => {
Expand All @@ -186,9 +194,9 @@ class ScheduleDialog extends React.Component {
// save if changed
if (hasChanged()) {
if (newSchedule.interval) {
this.props.dialog.close(clone(newSchedule));
this.props.dialog.close([clone(newSchedule), this.state.resultsetSize]);
} else {
this.props.dialog.close(null);
this.props.dialog.close([null, this.state.resultsetSize]);
}
}
this.props.dialog.dismiss();
Expand All @@ -202,6 +210,7 @@ class ScheduleDialog extends React.Component {
hour,
seconds,
newSchedule: { until },
resultsetSize,
} = this.state;

return (
Expand Down Expand Up @@ -271,6 +280,14 @@ class ScheduleDialog extends React.Component {
</div>
</div>
) : null}
Number of query results to keep
<InputNumber
className="form-control"
min={1}
defaultValue={resultsetSize || 1}
onChange={this.setResultsetSize}
/>

</Modal>
);
}
Expand Down
5 changes: 4 additions & 1 deletion client/app/pages/queries/view.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ function QueryViewCtrl(
} else {
request = pick($scope.query, [
'schedule',
'schedule_resultset_size',
'query',
'id',
'description',
Expand Down Expand Up @@ -503,8 +504,10 @@ function QueryViewCtrl(
ScheduleDialog.showModal({
schedule: $scope.query.schedule,
refreshOptions: $scope.refreshOptions,
}).result.then((schedule) => {
resultsetSize: $scope.query.schedule_resultset_size,
}).result.then(([schedule, resultsetSize]) => {
$scope.query.schedule = schedule;
$scope.query.schedule_resultset_size = resultsetSize;
$scope.saveQuery();
});
};
Expand Down
11 changes: 11 additions & 0 deletions client/app/services/query-result.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ function getColumnFriendlyName(column) {
function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) {
const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } });
const QueryResultByQueryIdResource = $resource('api/queries/:queryId/results/:id.json', { queryId: '@queryId', id: '@id' });
const QueryResultSetResource = $resource('api/queries/:id/resultset', { id: '@id' });
const Job = $resource('api/jobs/:id', { id: '@id' });
const JobWithApiKey = $resource('api/queries/:queryId/jobs/:id', { queryId: '@queryId', id: '@id' });
const statuses = {
Expand Down Expand Up @@ -296,6 +297,16 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) {
return queryResult;
}

static getResultSet(queryId) {
const queryResult = new QueryResult();

QueryResultSetResource.get({ id: queryId }, (response) => {
queryResult.update(response);
});

return queryResult;
}

loadLatestCachedResult(queryId, parameters) {
$resource('api/queries/:id/results', { id: '@queryId' }, { post: { method: 'POST' } })
.post({ queryId, parameters },
Expand Down
6 changes: 5 additions & 1 deletion client/app/services/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,11 @@ function QueryResource(
this.latest_query_data_id = null;
}

if (this.latest_query_data && maxAge !== 0) {
if (this.schedule_resultset_size > 1) {
if (!this.queryResult) {
this.queryResult = QueryResult.getResultSet(this.id);
}
} else if (this.latest_query_data && maxAge !== 0) {
if (!this.queryResult) {
this.queryResult = new QueryResult({
query_result: this.latest_query_data,
Expand Down
24 changes: 24 additions & 0 deletions migrations/versions/2ba47e9812b1_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""empty message
Revision ID: 2ba47e9812b1
Revises: 71477dadd6ef, 9d7678c47452
Create Date: 2018-07-25 16:09:54.769289
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '2ba47e9812b1'
down_revision = ('71477dadd6ef', '9d7678c47452', )
branch_labels = None
depends_on = None


def upgrade():
pass


def downgrade():
pass
34 changes: 34 additions & 0 deletions migrations/versions/9d7678c47452_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Incremental query results aggregation
Revision ID: 9d7678c47452
Revises: 15041b7085fe
Create Date: 2018-03-08 04:36:12.802199
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '9d7678c47452'
down_revision = '15041b7085fe'
branch_labels = None
depends_on = None


def upgrade():
op.create_table('query_resultsets',
sa.Column('query_id', sa.Integer(), nullable=False),
sa.Column('result_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['query_id'], ['queries.id'], ),
sa.ForeignKeyConstraint(['result_id'], ['query_results.id'], ),
sa.PrimaryKeyConstraint('query_id', 'result_id')
)
op.add_column(u'queries', sa.Column('schedule_resultset_size', sa.Integer(), nullable=True))
1

def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column(u'queries', 'schedule_resultset_size')
op.drop_table('query_resultsets')
# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions redash/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
QueryResultDropdownResource,
QueryDropdownsResource,
QueryResultListResource,
QueryResultSetResource,
QueryResultResource)
from redash.handlers.query_snippets import (QuerySnippetListResource,
QuerySnippetResource)
Expand Down Expand Up @@ -121,6 +122,7 @@ def json_representation(data, code, headers=None):
api.add_org_resource(QueryRegenerateApiKeyResource,
'/api/queries/<query_id>/regenerate_api_key',
endpoint='query_regenerate_api_key')
api.add_org_resource(QueryResultSetResource, '/api/queries/<query_id>/resultset', endpoint='query_aggregate_results')
api.add_org_resource(QueryVersionListResource, '/api/queries/<query_id>/version', endpoint='query_versions')
api.add_org_resource(ChangeResource, '/api/changes/<change_id>', endpoint='changes')

Expand Down
3 changes: 3 additions & 0 deletions redash/handlers/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def post(self):
:<json string name:
:<json string description:
:<json string schedule: Schedule interval, in seconds, for repeated execution of this query
:<json number schedule_resultset_size: Number of result sets to keep (null to keep only one)
:<json object options: Query options
.. _query-response-label:
Expand Down Expand Up @@ -235,6 +236,8 @@ def post(self):
query_def['data_source'] = data_source
query_def['org'] = self.current_org
query_def['is_draft'] = True
if query_def.get('schedule_resultset_size') == 1:
query_def['schedule_resultset_size'] = None
query = models.Query.create(**query_def)
query.record_changes(changed_by=self.current_user)
models.db.session.add(query)
Expand Down
27 changes: 27 additions & 0 deletions redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,33 @@ def get(self, query_id, dropdown_query_id):
return dropdown_values(dropdown_query_id, self.current_org)


class QueryResultSetResource(BaseResource):
@require_permission('view_query')
def get(self, query_id=None, filetype='json'):
query = get_object_or_404(models.Query.get_by_id_and_org, query_id, self.current_org)
if not query.schedule_resultset_size:
abort(404, message="query does not keep multiple results")

# Synthesize a result set from the last N results.
total = len(query.query_results)
offset = max(total - query.schedule_resultset_size, 0)
results = [qr.to_dict() for qr in query.query_results[offset:]]
if not results:
aggregate_result = {}
else:
# Start a synthetic data set with the data from the first result...
aggregate_result = results[0].copy()
aggregate_result['data'] = {'columns': results[0]['data']['columns'],
'rows': []}
# .. then add each subsequent result set into it.
for r in results:
aggregate_result['data']['rows'].extend(r['data']['rows'])

data = json_dumps({'query_result': aggregate_result})
headers = {'Content-Type': "application/json"}
return make_response(data, 200, headers)


class QueryResultResource(BaseResource):
@staticmethod
def add_cors_headers(headers):
Expand Down
46 changes: 46 additions & 0 deletions redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def unused(cls, days=7):
age_threshold = datetime.datetime.now() - datetime.timedelta(days=days)
return (
cls.query.filter(
~QueryResultSet.query.filter(QueryResultSet.result_id == QueryResult.id).exists(),
Query.id.is_(None),
cls.retrieved_at < age_threshold
)
Expand Down Expand Up @@ -316,6 +317,8 @@ def store_result(cls, org, data_source, query_hash, query, data, run_time, retri
# don't auto-update the updated_at timestamp
q.skip_updated_at = True
db.session.add(q)
if q.schedule_resultset_size > 0:
q.query_results.append(query_result)
query_ids = [q.id for q in queries]
logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash)

Expand Down Expand Up @@ -375,6 +378,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
data_source = db.relationship(DataSource, backref='queries')
latest_query_data_id = Column(db.Integer, db.ForeignKey("query_results.id"), nullable=True)
latest_query_data = db.relationship(QueryResult)
query_results = db.relationship("QueryResult", secondary="query_resultsets")
name = Column(db.String(255))
description = Column(db.String(4096), nullable=True)
query_text = Column("query", db.Text)
Expand All @@ -389,6 +393,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
is_draft = Column(db.Boolean, default=True, index=True)
schedule = Column(MutableDict.as_mutable(PseudoJSON), nullable=True)
schedule_failures = Column(db.Integer, default=0)
schedule_resultset_size = Column(db.Integer, nullable=True)
visualizations = db.relationship("Visualization", cascade="all, delete-orphan")
options = Column(MutableDict.as_mutable(PseudoJSON), default={})
search_vector = Column(TSVectorType('id', 'name', 'description', 'query',
Expand Down Expand Up @@ -599,6 +604,37 @@ def search(cls, term, group_ids, user_id=None, include_drafts=False,
# sort the result using the weight as defined in the search vector column
return all_queries.search(term, sort=True).limit(limit)

@classmethod
def delete_stale_resultsets(cls):
delete_count = 0
texts = [c[0] for c in db.session.query(Query.query_text)
.filter(Query.schedule_resultset_size != None).distinct()]
for text in texts:
queries = (Query.query.filter(Query.query_text == text,
Query.schedule_resultset_size != None)
.order_by(Query.schedule_resultset_size.desc()))
# Multiple queries with the same text may request multiple result sets
# be kept. We start with the one that keeps the most, and delete both
# the unneeded bridge rows and result sets.
first_query = queries.first()
if first_query is not None and first_query.schedule_resultset_size:
resultsets = QueryResultSet.query.filter(QueryResultSet.query_rel == first_query).order_by(QueryResultSet.result_id)
resultset_count = resultsets.count()
if resultset_count > first_query.schedule_resultset_size:
n_to_delete = resultset_count - first_query.schedule_resultset_size
r_ids = [r.result_id for r in resultsets][:n_to_delete]
QueryResultSet.query.filter(QueryResultSet.result_id.in_(r_ids)).delete(synchronize_session=False)
delete_count += QueryResult.query.filter(QueryResult.id.in_(r_ids)).delete(synchronize_session=False)
# By this point there are no stale result sets left.
# Delete unneeded bridge rows for the remaining queries.
for q in queries[1:]:
resultsets = db.session.query(QueryResultSet.result_id).filter(QueryResultSet.query_rel == q).order_by(QueryResultSet.result_id)
n_to_delete = resultsets.count() - q.schedule_resultset_size
if n_to_delete > 0:
stale_r = QueryResultSet.query.filter(QueryResultSet.result_id.in_(resultsets.limit(n_to_delete).subquery()))
stale_r.delete(synchronize_session=False)
return delete_count

@classmethod
def search_by_user(cls, term, user, limit=None):
return cls.by_user(user).search(term, sort=True).limit(limit)
Expand Down Expand Up @@ -706,6 +742,16 @@ def dashboard_api_keys(self):
return [api_key[0] for api_key in api_keys]


class QueryResultSet(db.Model):
query_id = Column(db.Integer, db.ForeignKey("queries.id"),
primary_key=True)
query_rel = db.relationship(Query)
result_id = Column(db.Integer, db.ForeignKey("query_results.id"),
primary_key=True)
result = db.relationship(QueryResult)
__tablename__ = 'query_resultsets'


@listens_for(Query.query_text, 'set')
def gen_query_hash(target, val, oldval, initiator):
target.query_hash = utils.gen_query_hash(val)
Expand Down
1 change: 1 addition & 0 deletions redash/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def serialize_query(query, with_stats=False, with_visualizations=False, with_use
'query': query.query_text,
'query_hash': query.query_hash,
'schedule': query.schedule,
'schedule_resultset_size': query.schedule_resultset_size,
'api_key': query.api_key,
'is_archived': query.is_archived,
'is_draft': query.is_draft,
Expand Down
1 change: 1 addition & 0 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def cleanup_query_results():
deleted_count = models.QueryResult.query.filter(
models.QueryResult.id.in_(unused_query_results.subquery())
).delete(synchronize_session=False)
deleted_count += models.Query.delete_stale_resultsets()
models.db.session.commit()
logger.info("Deleted %d unused query results.", deleted_count)

Expand Down
12 changes: 10 additions & 2 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def __call__(self):
query_hash=gen_query_hash('SELECT 1'),
data_source=data_source_factory.create,
org_id=1)

query_resultset_factory = ModelFactory(redash.models.QueryResultSet,
query_rel=query_factory.create,
result=query_result_factory.create)
visualization_factory = ModelFactory(redash.models.Visualization,
type='CHART',
query_rel=query_factory.create,
Expand Down Expand Up @@ -267,7 +269,10 @@ def create_query(self, **kwargs):
'org': self.org
}
args.update(kwargs)
return query_factory.create(**args)
query = query_factory.create(**args)
if 'schedule_failures' in args:
query.schedule_failures = args['schedule_failures']
return query

def create_query_with_params(self, **kwargs):
args = {
Expand Down Expand Up @@ -297,6 +302,9 @@ def create_query_result(self, **kwargs):

return query_result_factory.create(**args)

def create_query_resultset(self, **kwargs):
return query_resultset_factory.create(**kwargs)

def create_visualization(self, **kwargs):
args = {
'query_rel': self.create_query()
Expand Down
Loading

0 comments on commit c0c3075

Please sign in to comment.