Skip to content

Commit

Permalink
Merge pull request #1085 from getredash/feature/pause-api
Browse files Browse the repository at this point in the history
Feature: API to pause a data source
  • Loading branch information
arikfr committed May 31, 2016
2 parents e831218 + 59f8af2 commit 857caab
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 13 deletions.
2 changes: 2 additions & 0 deletions rd_ui/app/scripts/services/resources.js
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@
}, function(error) {
if (error.status === 403) {
queryResult.update(error.data);
} else if (error.status === 400 && 'job' in error.data) {
queryResult.update(error.data);
}
});

Expand Down
3 changes: 2 additions & 1 deletion redash/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from redash.handlers.base import org_scoped_rule
from redash.handlers.alerts import AlertResource, AlertListResource, AlertSubscriptionListResource, AlertSubscriptionResource
from redash.handlers.dashboards import DashboardListResource, RecentDashboardsResource, DashboardResource, DashboardShareResource
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource, DataSourcePauseResource
from redash.handlers.events import EventResource
from redash.handlers.queries import QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource
Expand Down Expand Up @@ -49,6 +49,7 @@ def json_representation(data, code, headers=None):
api.add_org_resource(DataSourceTypeListResource, '/api/data_sources/types', endpoint='data_source_types')
api.add_org_resource(DataSourceListResource, '/api/data_sources', endpoint='data_sources')
api.add_org_resource(DataSourceSchemaResource, '/api/data_sources/<data_source_id>/schema')
api.add_org_resource(DataSourcePauseResource, '/api/data_sources/<data_source_id>/pause')
api.add_org_resource(DataSourceResource, '/api/data_sources/<data_source_id>', endpoint='data_source')

api.add_org_resource(GroupListResource, '/api/groups', endpoint='groups')
Expand Down
35 changes: 35 additions & 0 deletions redash/handlers/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,38 @@ def get(self, data_source_id):

return schema


class DataSourcePauseResource(BaseResource):
@require_admin
def post(self, data_source_id):
data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
data = request.get_json(force=True, silent=True)
if data:
reason = data.get('reason')
else:
reason = request.args.get('reason')

data_source.pause(reason)
data_source.save()

self.record_event({
'action': 'pause',
'object_id': data_source.id,
'object_type': 'datasource'
})

return data_source.to_dict()

@require_admin
def delete(self, data_source_id):
data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
data_source.resume()
data_source.save()

self.record_event({
'action': 'resume',
'object_id': data_source.id,
'object_type': 'datasource'
})

return data_source.to_dict()
15 changes: 13 additions & 2 deletions redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@
from redash.tasks.queries import enqueue_query


def error_response(message):
return {'job': {'status': 4, 'error': message}}, 400


def run_query(data_source, parameter_values, query_text, query_id, max_age=0):
query_parameters = set(collect_query_parameters(query_text))
missing_params = set(query_parameters) - set(parameter_values.keys())
if missing_params:
return {'job': {'status': 4,
'error': 'Missing parameter value for: {}'.format(", ".join(missing_params))}}, 400
return error_response('Missing parameter value for: {}'.format(", ".join(missing_params)))

if data_source.paused:
if data_source.pause_reason:
message = '{} is paused ({}). Please try later.'.format(data_source.name, data_source.pause_reason)
else:
message = '{} is paused. Please try later.'.format(data_source.name)

return error_response(message)

if query_parameters:
query_text = pystache.render(query_text, parameter_values)
Expand Down
21 changes: 20 additions & 1 deletion redash/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,9 @@ def to_dict(self, all=False, with_permissions=False):
'id': self.id,
'name': self.name,
'type': self.type,
'syntax': self.query_runner.syntax
'syntax': self.query_runner.syntax,
'paused': self.paused,
'pause_reason': self.pause_reason
}

if all:
Expand Down Expand Up @@ -414,6 +416,23 @@ def get_schema(self, refresh=False):

return schema

def _pause_key(self):
return 'ds:{}:pause'.format(self.id)

@property
def paused(self):
return redis_connection.exists(self._pause_key())

@property
def pause_reason(self):
return redis_connection.get(self._pause_key())

def pause(self, reason=None):
redis_connection.set(self._pause_key(), reason)

def resume(self):
redis_connection.delete(self._pause_key())

def add_group(self, group, view_only=False):
dsg = DataSourceGroup.create(group=group, data_source=self, view_only=view_only)
setattr(self, 'data_source_groups', dsg)
Expand Down
23 changes: 15 additions & 8 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,13 @@ def refresh_queries():

with statsd_client.timer('manager.outdated_queries_lookup'):
for query in models.Query.outdated_queries():
enqueue_query(query.query, query.data_source,
scheduled=True,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
if query.data_source.paused:
logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason)
else:
enqueue_query(query.query, query.data_source,
scheduled=True,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})

query_ids.append(query.id)
outdated_queries_count += 1

Expand Down Expand Up @@ -344,11 +348,14 @@ def refresh_schemas():
Refreshes the data sources schemas.
"""
for ds in models.DataSource.select():
logger.info("Refreshing schema for: {}".format(ds.name))
try:
ds.get_schema(refresh=True)
except Exception:
logger.exception("Failed refreshing schema for the data source: %s", ds.name)
if ds.paused:
logger.info(u"Skipping refresh schema of %s because it is paused (%s).", ds.name, ds.pause_reason)
else:
logger.info(u"Refreshing schema for: {}".format(ds.name))
try:
ds.get_schema(refresh=True)
except Exception:
logger.exception(u"Failed refreshing schema for the data source: %s", ds.name)


def signal_handler(*args):
Expand Down
3 changes: 3 additions & 0 deletions redash/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def update(self, new_config):
def get(self, *args, **kwargs):
return self._config.get(*args, **kwargs)

def __setitem__(self, key, value):
self._config[key] = value

def __getitem__(self, item):
if item in self._config:
return self._config[item]
Expand Down
3 changes: 2 additions & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def __call__(self):
data_source_factory = ModelFactory(redash.models.DataSource,
name=Sequence('Test {}'),
type='pg',
options=ConfigurationContainer.from_json('{"dbname": "test"}'),
# If we don't use lambda here it will reuse the same options between tests:
options=lambda: ConfigurationContainer.from_json('{"dbname": "test"}'),
org=1)

dashboard_factory = ModelFactory(redash.models.Dashboard,
Expand Down
36 changes: 36 additions & 0 deletions tests/handlers/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,39 @@ def test_creates_data_source(self):
data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin)

self.assertEqual(rv.status_code, 200)


class TestDataSourcePausePost(BaseTestCase):
def test_pauses_data_source(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)

def test_pause_sets_reason(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin, data={'reason': 'testing'})
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'testing')

rv = self.make_request('post', '/api/data_sources/{}/pause?reason=test'.format(self.factory.data_source.id), user=admin)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'test')

def test_requires_admin(self):
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))
self.assertEqual(rv.status_code, 403)


class TestDataSourcePauseDelete(BaseTestCase):
def test_resumes_data_source(self):
admin = self.factory.create_admin()
self.factory.data_source.pause()
self.factory.data_source.save()
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, False)

def test_requires_admin(self):
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))
self.assertEqual(rv.status_code, 403)
12 changes: 12 additions & 0 deletions tests/handlers/test_query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ def test_execute_query_with_params(self):
self.assertEquals(rv.status_code, 200)
self.assertIn('job', rv.json)

def test_execute_on_paused_data_source(self):
self.factory.data_source.pause()

rv = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,
'query': 'SELECT 1',
'max_age': 0})

self.assertEquals(rv.status_code, 400)
self.assertNotIn('query_result', rv.json)
self.assertIn('job', rv.json)


class TestQueryResultAPI(BaseTestCase):
def test_has_no_access_to_data_source(self):
Expand Down
26 changes: 26 additions & 0 deletions tests/models/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,29 @@ class TestDataSourceCreate(BaseTestCase):
def test_adds_data_source_to_default_group(self):
data_source = DataSource.create_with_group(org=self.factory.org, name='test', options=ConfigurationContainer.from_json('{"dbname": "test"}'), type='pg')
self.assertIn(self.factory.org.default_group.id, data_source.groups)


class TestDataSourceIsPaused(BaseTestCase):
def test_returns_false_by_default(self):
self.assertFalse(self.factory.data_source.paused)

def test_persists_selection(self):
self.factory.data_source.pause()
self.assertTrue(self.factory.data_source.paused)

self.factory.data_source.resume()
self.assertFalse(self.factory.data_source.paused)

def test_allows_setting_reason(self):
reason = "Some good reason."
self.factory.data_source.pause(reason)
self.assertTrue(self.factory.data_source.paused)
self.assertEqual(self.factory.data_source.pause_reason, reason)

def test_resume_clears_reason(self):
self.factory.data_source.pause("Reason")
self.factory.data_source.resume()
self.assertEqual(self.factory.data_source.pause_reason, None)

def test_reason_is_none_by_default(self):
self.assertEqual(self.factory.data_source.pause_reason, None)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@ def test_enqueues_outdated_queries(self):
refresh_queries()
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)

def test_doesnt_enqueue_outdated_queries_for_paused_data_source(self):
query = self.factory.create_query(schedule="60")
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
query.latest_query_data = query_result
query.save()

query.data_source.pause()

with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_not_called()

query.data_source.resume()

with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)

def test_skips_fresh_queries(self):
query = self.factory.create_query(schedule="1200")
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
Expand Down
24 changes: 24 additions & 0 deletions tests/tasks/test_refresh_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import datetime
from mock import patch, call, ANY
from tests import BaseTestCase
from redash.tasks import refresh_schemas


class TestRefreshSchemas(BaseTestCase):
def test_calls_refresh_of_all_data_sources(self):
with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_called_with(refresh=True)

def test_skips_paused_data_sources(self):
self.factory.data_source.pause()

with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_not_called()

self.factory.data_source.resume()

with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_called_with(refresh=True)

0 comments on commit 857caab

Please sign in to comment.