From 9538ee7c318a28c11c327e950cf1761fef7af56e Mon Sep 17 00:00:00 2001 From: Arik Fraimovich Date: Mon, 30 May 2016 18:30:05 +0300 Subject: [PATCH 1/2] Feature: API to pause a data source --- redash/handlers/api.py | 3 +- redash/handlers/data_sources.py | 34 +++++++++++++++++++++++ redash/handlers/query_results.py | 15 ++++++++-- redash/models.py | 19 ++++++++++++- redash/tasks/queries.py | 23 +++++++++------ redash/utils/configuration.py | 3 ++ tests/factories.py | 3 +- tests/handlers/test_data_sources.py | 33 ++++++++++++++++++++++ tests/handlers/test_query_results.py | 13 +++++++++ tests/models/test_data_sources.py | 26 +++++++++++++++++ tests/{ => tasks}/test_refresh_queries.py | 22 +++++++++++++++ tests/tasks/test_refresh_schemas.py | 19 +++++++++++++ 12 files changed, 200 insertions(+), 13 deletions(-) rename tests/{ => tasks}/test_refresh_queries.py (83%) create mode 100644 tests/tasks/test_refresh_schemas.py diff --git a/redash/handlers/api.py b/redash/handlers/api.py index ebd36903f8..142927a7ca 100644 --- a/redash/handlers/api.py +++ b/redash/handlers/api.py @@ -6,7 +6,7 @@ from redash.handlers.base import org_scoped_rule from redash.handlers.alerts import AlertResource, AlertListResource, AlertSubscriptionListResource, AlertSubscriptionResource from redash.handlers.dashboards import DashboardListResource, RecentDashboardsResource, DashboardResource, DashboardShareResource -from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource +from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource, DataSourcePauseResource from redash.handlers.events import EventResource from redash.handlers.queries import QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource @@ -49,6 +49,7 @@ def json_representation(data, code, headers=None): api.add_org_resource(DataSourceTypeListResource, '/api/data_sources/types', endpoint='data_source_types') api.add_org_resource(DataSourceListResource, '/api/data_sources', endpoint='data_sources') api.add_org_resource(DataSourceSchemaResource, '/api/data_sources//schema') +api.add_org_resource(DataSourcePauseResource, '/api/data_sources//pause') api.add_org_resource(DataSourceResource, '/api/data_sources/', endpoint='data_source') api.add_org_resource(GroupListResource, '/api/groups', endpoint='groups') diff --git a/redash/handlers/data_sources.py b/redash/handlers/data_sources.py index 07f4441c86..70623a4de6 100644 --- a/redash/handlers/data_sources.py +++ b/redash/handlers/data_sources.py @@ -106,3 +106,37 @@ def get(self, data_source_id): return schema + +class DataSourcePauseResource(BaseResource): + @require_admin + def post(self, data_source_id): + data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org) + data = request.get_json(force=True, silent=True) + if data: + reason = data.get('reason') + else: + reason = None + data_source.pause(reason) + data_source.save() + + self.record_event({ + 'action': 'pause', + 'object_id': data_source.id, + 'object_type': 'datasource' + }) + + return data_source.to_dict() + + @require_admin + def delete(self, data_source_id): + data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org) + data_source.resume() + data_source.save() + + self.record_event({ + 'action': 'resume', + 'object_id': data_source.id, + 'object_type': 'datasource' + }) + + return data_source.to_dict() diff --git a/redash/handlers/query_results.py b/redash/handlers/query_results.py index a89f94d991..7fd1ce37fe 100644 --- a/redash/handlers/query_results.py +++ b/redash/handlers/query_results.py @@ -16,12 +16,23 @@ from redash.tasks.queries import enqueue_query +def error_response(message): + return {'job': {'status': 4, 'error': message}}, 400 + + def run_query(data_source, parameter_values, query_text, query_id, max_age=0): query_parameters = set(collect_query_parameters(query_text)) missing_params = set(query_parameters) - set(parameter_values.keys()) if missing_params: - return {'job': {'status': 4, - 'error': 'Missing parameter value for: {}'.format(", ".join(missing_params))}}, 400 + return error_response('Missing parameter value for: {}'.format(", ".join(missing_params))) + + if data_source.is_paused: + if data_source.pause_reason: + message = '{} is paused ({}). Please try later.'.format(data_source.name, data_source.pause_reason) + else: + message = '{} is paused. Please try later.'.format(data_source.name) + + return error_response(message) if query_parameters: query_text = pystache.render(query_text, parameter_values) diff --git a/redash/models.py b/redash/models.py index 8eb346521c..e32ba65c26 100644 --- a/redash/models.py +++ b/redash/models.py @@ -372,7 +372,8 @@ def to_dict(self, all=False, with_permissions=False): 'id': self.id, 'name': self.name, 'type': self.type, - 'syntax': self.query_runner.syntax + 'syntax': self.query_runner.syntax, + 'is_paused': self.is_paused } if all: @@ -414,6 +415,22 @@ def get_schema(self, refresh=False): return schema + @property + def is_paused(self): + return self.options.get('is_paused', False) + + @property + def pause_reason(self): + return self.options.get('pause_reason', None) + + def pause(self, reason=None): + self.options['is_paused'] = True + self.options['pause_reason'] = reason + + def resume(self): + self.options['is_paused'] = False + self.options['pause_reason'] = None + def add_group(self, group, view_only=False): dsg = DataSourceGroup.create(group=group, data_source=self, view_only=view_only) setattr(self, 'data_source_groups', dsg) diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index f5ecdd0ba9..9d1ad9b859 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -264,9 +264,13 @@ def refresh_queries(): with statsd_client.timer('manager.outdated_queries_lookup'): for query in models.Query.outdated_queries(): - enqueue_query(query.query, query.data_source, - scheduled=True, - metadata={'Query ID': query.id, 'Username': 'Scheduled'}) + if query.data_source.is_paused: + logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason) + else: + enqueue_query(query.query, query.data_source, + scheduled=True, + metadata={'Query ID': query.id, 'Username': 'Scheduled'}) + query_ids.append(query.id) outdated_queries_count += 1 @@ -344,11 +348,14 @@ def refresh_schemas(): Refreshes the data sources schemas. """ for ds in models.DataSource.select(): - logger.info("Refreshing schema for: {}".format(ds.name)) - try: - ds.get_schema(refresh=True) - except Exception: - logger.exception("Failed refreshing schema for the data source: %s", ds.name) + if ds.is_paused: + logger.info(u"Skipping refresh schema of %s because it is paused (%s).", ds.name, ds.pause_reason) + else: + logger.info(u"Refreshing schema for: {}".format(ds.name)) + try: + ds.get_schema(refresh=True) + except Exception: + logger.exception(u"Failed refreshing schema for the data source: %s", ds.name) def signal_handler(*args): diff --git a/redash/utils/configuration.py b/redash/utils/configuration.py index 1b8637cc1a..676c859201 100644 --- a/redash/utils/configuration.py +++ b/redash/utils/configuration.py @@ -63,6 +63,9 @@ def update(self, new_config): def get(self, *args, **kwargs): return self._config.get(*args, **kwargs) + def __setitem__(self, key, value): + self._config[key] = value + def __getitem__(self, item): if item in self._config: return self._config[item] diff --git a/tests/factories.py b/tests/factories.py index 88976e0030..ccea69bb97 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -52,7 +52,8 @@ def __call__(self): data_source_factory = ModelFactory(redash.models.DataSource, name=Sequence('Test {}'), type='pg', - options=ConfigurationContainer.from_json('{"dbname": "test"}'), + # If we don't use lambda here it will reuse the same options between tests: + options=lambda: ConfigurationContainer.from_json('{"dbname": "test"}'), org=1) dashboard_factory = ModelFactory(redash.models.Dashboard, diff --git a/tests/handlers/test_data_sources.py b/tests/handlers/test_data_sources.py index ee9f3af595..b03e1f0b30 100644 --- a/tests/handlers/test_data_sources.py +++ b/tests/handlers/test_data_sources.py @@ -96,3 +96,36 @@ def test_creates_data_source(self): data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin) self.assertEqual(rv.status_code, 200) + + +class TestDataSourcePausePost(BaseTestCase): + def test_pauses_data_source(self): + admin = self.factory.create_admin() + rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin) + self.assertEqual(rv.status_code, 200) + self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).is_paused, True) + + def test_pause_sets_reason(self): + admin = self.factory.create_admin() + rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin, data={'reason': 'testing'}) + self.assertEqual(rv.status_code, 200) + self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).is_paused, True) + self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'testing') + + def test_requires_admin(self): + rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id)) + self.assertEqual(rv.status_code, 403) + + +class TestDataSourcePauseDelete(BaseTestCase): + def test_resumes_data_source(self): + admin = self.factory.create_admin() + self.factory.data_source.pause() + self.factory.data_source.save() + rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin) + self.assertEqual(rv.status_code, 200) + self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).is_paused, False) + + def test_requires_admin(self): + rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id)) + self.assertEqual(rv.status_code, 403) diff --git a/tests/handlers/test_query_results.py b/tests/handlers/test_query_results.py index 645f725b5f..c1235cf7f7 100644 --- a/tests/handlers/test_query_results.py +++ b/tests/handlers/test_query_results.py @@ -73,6 +73,19 @@ def test_execute_query_with_params(self): self.assertEquals(rv.status_code, 200) self.assertIn('job', rv.json) + def test_execute_on_paused_data_source(self): + self.factory.data_source.pause() + self.factory.data_source.save() + + rv = self.make_request('post', '/api/query_results', + data={'data_source_id': self.factory.data_source.id, + 'query': 'SELECT 1', + 'max_age': 0}) + + self.assertEquals(rv.status_code, 400) + self.assertNotIn('query_result', rv.json) + self.assertIn('job', rv.json) + class TestQueryResultAPI(BaseTestCase): def test_has_no_access_to_data_source(self): diff --git a/tests/models/test_data_sources.py b/tests/models/test_data_sources.py index fef9e7869f..eb14e579f8 100644 --- a/tests/models/test_data_sources.py +++ b/tests/models/test_data_sources.py @@ -7,3 +7,29 @@ class TestDataSourceCreate(BaseTestCase): def test_adds_data_source_to_default_group(self): data_source = DataSource.create_with_group(org=self.factory.org, name='test', options=ConfigurationContainer.from_json('{"dbname": "test"}'), type='pg') self.assertIn(self.factory.org.default_group.id, data_source.groups) + + +class TestDataSourceIsPaused(BaseTestCase): + def test_returns_false_by_default(self): + self.assertFalse(self.factory.data_source.is_paused) + + def test_persists_selection(self): + self.factory.data_source.pause() + self.assertTrue(self.factory.data_source.is_paused) + + self.factory.data_source.resume() + self.assertFalse(self.factory.data_source.is_paused) + + def test_allows_setting_reason(self): + reason = "Some good reason." + self.factory.data_source.pause(reason) + self.assertTrue(self.factory.data_source.is_paused) + self.assertEqual(self.factory.data_source.pause_reason, reason) + + def test_resume_clears_reason(self): + self.factory.data_source.pause("Reason") + self.factory.data_source.resume() + self.assertEqual(self.factory.data_source.pause_reason, None) + + def test_reason_is_none_by_default(self): + self.assertEqual(self.factory.data_source.pause_reason, None) diff --git a/tests/test_refresh_queries.py b/tests/tasks/test_refresh_queries.py similarity index 83% rename from tests/test_refresh_queries.py rename to tests/tasks/test_refresh_queries.py index 2ebfbd025c..e066f9ce4a 100644 --- a/tests/test_refresh_queries.py +++ b/tests/tasks/test_refresh_queries.py @@ -21,6 +21,28 @@ def test_enqueues_outdated_queries(self): refresh_queries() add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY) + def test_doesnt_enqueue_outdated_queries_for_paused_data_source(self): + query = self.factory.create_query(schedule="60") + retrieved_at = utcnow() - datetime.timedelta(minutes=10) + query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query=query.query, + query_hash=query.query_hash) + query.latest_query_data = query_result + query.save() + + query.data_source.pause() + query.data_source.save() + + with patch('redash.tasks.queries.enqueue_query') as add_job_mock: + refresh_queries() + add_job_mock.assert_not_called() + + query.data_source.resume() + query.data_source.save() + + with patch('redash.tasks.queries.enqueue_query') as add_job_mock: + refresh_queries() + add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY) + def test_skips_fresh_queries(self): query = self.factory.create_query(schedule="1200") retrieved_at = utcnow() - datetime.timedelta(minutes=10) diff --git a/tests/tasks/test_refresh_schemas.py b/tests/tasks/test_refresh_schemas.py new file mode 100644 index 0000000000..f9f99ebc4e --- /dev/null +++ b/tests/tasks/test_refresh_schemas.py @@ -0,0 +1,19 @@ +import datetime +from mock import patch, call, ANY +from tests import BaseTestCase +from redash.tasks import refresh_schemas + + +class TestRefreshSchemas(BaseTestCase): + def test_calls_refresh_of_all_data_sources(self): + with patch('redash.models.DataSource.get_schema') as get_schema: + refresh_schemas() + get_schema.assert_called_with(refresh=True) + + def test_skips_paused_data_sources(self): + self.factory.data_source.pause() + self.factory.data_source.save() + + with patch('redash.models.DataSource.get_schema') as get_schema: + refresh_schemas() + get_schema.assert_not_called() From 59f8af2c44b6bf68626a743f42730e156a52b15f Mon Sep 17 00:00:00 2001 From: Arik Fraimovich Date: Mon, 30 May 2016 22:44:09 +0300 Subject: [PATCH 2/2] Switch to Redis for pause state storage --- rd_ui/app/scripts/services/resources.js | 2 ++ redash/handlers/data_sources.py | 3 ++- redash/handlers/query_results.py | 2 +- redash/models.py | 18 ++++++++++-------- redash/tasks/queries.py | 4 ++-- tests/handlers/test_data_sources.py | 9 ++++++--- tests/handlers/test_query_results.py | 1 - tests/models/test_data_sources.py | 8 ++++---- tests/tasks/test_refresh_queries.py | 2 -- tests/tasks/test_refresh_schemas.py | 7 ++++++- 10 files changed, 33 insertions(+), 23 deletions(-) diff --git a/rd_ui/app/scripts/services/resources.js b/rd_ui/app/scripts/services/resources.js index 5179e48fea..1751e562a1 100644 --- a/rd_ui/app/scripts/services/resources.js +++ b/rd_ui/app/scripts/services/resources.js @@ -406,6 +406,8 @@ }, function(error) { if (error.status === 403) { queryResult.update(error.data); + } else if (error.status === 400 && 'job' in error.data) { + queryResult.update(error.data); } }); diff --git a/redash/handlers/data_sources.py b/redash/handlers/data_sources.py index 70623a4de6..eb48891651 100644 --- a/redash/handlers/data_sources.py +++ b/redash/handlers/data_sources.py @@ -115,7 +115,8 @@ def post(self, data_source_id): if data: reason = data.get('reason') else: - reason = None + reason = request.args.get('reason') + data_source.pause(reason) data_source.save() diff --git a/redash/handlers/query_results.py b/redash/handlers/query_results.py index 7fd1ce37fe..dc44f6eeb8 100644 --- a/redash/handlers/query_results.py +++ b/redash/handlers/query_results.py @@ -26,7 +26,7 @@ def run_query(data_source, parameter_values, query_text, query_id, max_age=0): if missing_params: return error_response('Missing parameter value for: {}'.format(", ".join(missing_params))) - if data_source.is_paused: + if data_source.paused: if data_source.pause_reason: message = '{} is paused ({}). Please try later.'.format(data_source.name, data_source.pause_reason) else: diff --git a/redash/models.py b/redash/models.py index e32ba65c26..ee733a4a2d 100644 --- a/redash/models.py +++ b/redash/models.py @@ -373,7 +373,8 @@ def to_dict(self, all=False, with_permissions=False): 'name': self.name, 'type': self.type, 'syntax': self.query_runner.syntax, - 'is_paused': self.is_paused + 'paused': self.paused, + 'pause_reason': self.pause_reason } if all: @@ -415,21 +416,22 @@ def get_schema(self, refresh=False): return schema + def _pause_key(self): + return 'ds:{}:pause'.format(self.id) + @property - def is_paused(self): - return self.options.get('is_paused', False) + def paused(self): + return redis_connection.exists(self._pause_key()) @property def pause_reason(self): - return self.options.get('pause_reason', None) + return redis_connection.get(self._pause_key()) def pause(self, reason=None): - self.options['is_paused'] = True - self.options['pause_reason'] = reason + redis_connection.set(self._pause_key(), reason) def resume(self): - self.options['is_paused'] = False - self.options['pause_reason'] = None + redis_connection.delete(self._pause_key()) def add_group(self, group, view_only=False): dsg = DataSourceGroup.create(group=group, data_source=self, view_only=view_only) diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index 9d1ad9b859..4ce6dbd561 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -264,7 +264,7 @@ def refresh_queries(): with statsd_client.timer('manager.outdated_queries_lookup'): for query in models.Query.outdated_queries(): - if query.data_source.is_paused: + if query.data_source.paused: logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason) else: enqueue_query(query.query, query.data_source, @@ -348,7 +348,7 @@ def refresh_schemas(): Refreshes the data sources schemas. """ for ds in models.DataSource.select(): - if ds.is_paused: + if ds.paused: logger.info(u"Skipping refresh schema of %s because it is paused (%s).", ds.name, ds.pause_reason) else: logger.info(u"Refreshing schema for: {}".format(ds.name)) diff --git a/tests/handlers/test_data_sources.py b/tests/handlers/test_data_sources.py index b03e1f0b30..67147d6273 100644 --- a/tests/handlers/test_data_sources.py +++ b/tests/handlers/test_data_sources.py @@ -103,15 +103,18 @@ def test_pauses_data_source(self): admin = self.factory.create_admin() rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin) self.assertEqual(rv.status_code, 200) - self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).is_paused, True) + self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True) def test_pause_sets_reason(self): admin = self.factory.create_admin() rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin, data={'reason': 'testing'}) self.assertEqual(rv.status_code, 200) - self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).is_paused, True) + self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True) self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'testing') + rv = self.make_request('post', '/api/data_sources/{}/pause?reason=test'.format(self.factory.data_source.id), user=admin) + self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'test') + def test_requires_admin(self): rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id)) self.assertEqual(rv.status_code, 403) @@ -124,7 +127,7 @@ def test_resumes_data_source(self): self.factory.data_source.save() rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin) self.assertEqual(rv.status_code, 200) - self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).is_paused, False) + self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, False) def test_requires_admin(self): rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id)) diff --git a/tests/handlers/test_query_results.py b/tests/handlers/test_query_results.py index c1235cf7f7..8731e45e57 100644 --- a/tests/handlers/test_query_results.py +++ b/tests/handlers/test_query_results.py @@ -75,7 +75,6 @@ def test_execute_query_with_params(self): def test_execute_on_paused_data_source(self): self.factory.data_source.pause() - self.factory.data_source.save() rv = self.make_request('post', '/api/query_results', data={'data_source_id': self.factory.data_source.id, diff --git a/tests/models/test_data_sources.py b/tests/models/test_data_sources.py index eb14e579f8..944284ac15 100644 --- a/tests/models/test_data_sources.py +++ b/tests/models/test_data_sources.py @@ -11,19 +11,19 @@ def test_adds_data_source_to_default_group(self): class TestDataSourceIsPaused(BaseTestCase): def test_returns_false_by_default(self): - self.assertFalse(self.factory.data_source.is_paused) + self.assertFalse(self.factory.data_source.paused) def test_persists_selection(self): self.factory.data_source.pause() - self.assertTrue(self.factory.data_source.is_paused) + self.assertTrue(self.factory.data_source.paused) self.factory.data_source.resume() - self.assertFalse(self.factory.data_source.is_paused) + self.assertFalse(self.factory.data_source.paused) def test_allows_setting_reason(self): reason = "Some good reason." self.factory.data_source.pause(reason) - self.assertTrue(self.factory.data_source.is_paused) + self.assertTrue(self.factory.data_source.paused) self.assertEqual(self.factory.data_source.pause_reason, reason) def test_resume_clears_reason(self): diff --git a/tests/tasks/test_refresh_queries.py b/tests/tasks/test_refresh_queries.py index e066f9ce4a..a0e496351d 100644 --- a/tests/tasks/test_refresh_queries.py +++ b/tests/tasks/test_refresh_queries.py @@ -30,14 +30,12 @@ def test_doesnt_enqueue_outdated_queries_for_paused_data_source(self): query.save() query.data_source.pause() - query.data_source.save() with patch('redash.tasks.queries.enqueue_query') as add_job_mock: refresh_queries() add_job_mock.assert_not_called() query.data_source.resume() - query.data_source.save() with patch('redash.tasks.queries.enqueue_query') as add_job_mock: refresh_queries() diff --git a/tests/tasks/test_refresh_schemas.py b/tests/tasks/test_refresh_schemas.py index f9f99ebc4e..a361127ca4 100644 --- a/tests/tasks/test_refresh_schemas.py +++ b/tests/tasks/test_refresh_schemas.py @@ -12,8 +12,13 @@ def test_calls_refresh_of_all_data_sources(self): def test_skips_paused_data_sources(self): self.factory.data_source.pause() - self.factory.data_source.save() with patch('redash.models.DataSource.get_schema') as get_schema: refresh_schemas() get_schema.assert_not_called() + + self.factory.data_source.resume() + + with patch('redash.models.DataSource.get_schema') as get_schema: + refresh_schemas() + get_schema.assert_called_with(refresh=True)