Skip to content

Commit

Permalink
Closes #379: Add a task to monitor data source health.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marina Samuel authored and Allen Short committed Jul 30, 2018
1 parent 7730b28 commit c08de97
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 3 deletions.
2 changes: 2 additions & 0 deletions redash/monitor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from redash import redis_connection, models, __version__, settings


Expand All @@ -14,6 +15,7 @@ def get_object_counts():
status['unused_query_results_count'] = models.QueryResult.unused().count()
status['dashboards_count'] = models.Dashboard.query.count()
status['widgets_count'] = models.Widget.query.count()
status['data_sources'] = json.loads(redis_connection.get('data_sources:health') or '{}')
return status


Expand Down
6 changes: 4 additions & 2 deletions redash/query_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,12 @@ def get_data_source_version(self):

return version

def test_connection(self):
def test_connection(self, custom_query_text=None):
if self.noop_query is None:
raise NotImplementedError()
data, error = self.run_query(self.noop_query, None)

query_text = custom_query_text or self.noop_query
data, error = self.run_query(query_text, None)

if error is not None:
raise Exception(error)
Expand Down
13 changes: 12 additions & 1 deletion redash/settings/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from funcy import distinct, remove

from .helpers import parse_db_url, fix_assets_path, array_from_string, parse_boolean, int_or_none, set_from_string
from .helpers import parse_db_url, fix_assets_path, array_from_string, parse_boolean, int_or_none, set_from_string, dict_from_string


def all_settings():
Expand Down Expand Up @@ -242,3 +242,14 @@ def all_settings():
# Allow Parameters in Embeds
# WARNING: With this option enabled, Redash reads query parameters from the request URL (risk of SQL injection!)
ALLOW_PARAMETERS_IN_EMBEDS = parse_boolean(os.environ.get("REDASH_ALLOW_PARAMETERS_IN_EMBEDS", "false"))

# Allow for a map of custom queries to test data source performance and availability.
# A sample map may look like:
# {
# "1": "select 1;",
# "5": "select 1;"
# }
CUSTOM_HEALTH_QUERIES = dict_from_string(os.environ.get("REDASH_CUSTOM_HEALTH_QUERIES", ""))

# Frequency of health query runs in minutes (12 hours by default)
HEALTH_QUERIES_REFRESH_SCHEDULE = int(os.environ.get("REDASH_HEALTH_QUERIES_REFRESH_SCHEDULE", 720))
6 changes: 6 additions & 0 deletions redash/settings/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ def array_from_string(s):

return [item.strip() for item in array]

def dict_from_string(s):
try:
return json.loads(s)
except ValueError:
return {}


def set_from_string(s):
return set(array_from_string(s))
Expand Down
1 change: 1 addition & 0 deletions redash/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .general import record_event, version_check, send_mail
from .health import health_status
from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_tasks, cleanup_query_results, execute_query
from .alerts import check_alerts_for_query
59 changes: 59 additions & 0 deletions redash/tasks/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
import time
from random import randint

from celery.utils.log import get_task_logger
from redash import models, redis_connection, settings, statsd_client
from redash.worker import celery
from redash.utils import parse_human_time

logger = get_task_logger(__name__)


def update_health_status(data_source_id, data_source_name, query_text, data):
key = "data_sources:health"

cache = json.loads(redis_connection.get(key) or '{}')
if data_source_id not in cache:
cache[data_source_id] = {
"metadata": { "name": data_source_name },
"queries": {}
}
cache[data_source_id]["queries"][query_text] = data

cache[data_source_id]["status"] = "SUCCESS"
for query_status in cache[data_source_id]["queries"].values():
if query_status["status"] == "FAIL":
cache[data_source_id]["status"] = "FAIL"
break

redis_connection.set(key, json.dumps(cache))

@celery.task(name="redash.tasks.health_status", time_limit=90, soft_time_limit=60)
def health_status():
for ds in models.DataSource.query:
logger.info(u"task=health_status state=start ds_id=%s", ds.id)

runtime = None
query_text = ds.query_runner.noop_query
custom_queries = settings.CUSTOM_HEALTH_QUERIES
ds_id = str(ds.id)

if custom_queries and ds_id in custom_queries:
query_text = custom_queries[ds_id]

try:
start_time = time.time()
ds.query_runner.test_connection(query_text)
runtime = time.time() - start_time
except Exception as e:
logger.warning(u"Failed health check for the data source: %s", ds.name, exc_info=1)
statsd_client.incr('health_status.error')
logger.info(u"task=health_status state=error ds_id=%s runtime=%.2f", ds.id, time.time() - start_time)

update_health_status(ds_id, ds.name, query_text, {
"status": "SUCCESS" if runtime is not None else "FAIL",
"last_run": start_time,
"last_run_human": str(parse_human_time(str(start_time))),
"runtime": runtime
})
4 changes: 4 additions & 0 deletions redash/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
include='redash.tasks')

celery_schedule = {
'health_status': {
'task': 'redash.tasks.health_status',
'schedule': timedelta(minutes=settings.HEALTH_QUERIES_REFRESH_SCHEDULE)
},
'refresh_queries': {
'task': 'redash.tasks.refresh_queries',
'schedule': timedelta(seconds=30)
Expand Down
136 changes: 136 additions & 0 deletions tests/tasks/test_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import json
import mock
from tests import BaseTestCase

from redash import redis_connection
from redash.tasks.health import update_health_status, health_status


class TestHealthStatus(BaseTestCase):
def setUp(self):
super(TestHealthStatus, self).setUp()
self.patched_custom_queries = self._setup_mock('redash.tasks.health.settings')
self.patched_updated_health_status = self._setup_mock('redash.tasks.health.update_health_status')
self.patched_run_query = self._setup_mock('redash.query_runner.pg.PostgreSQL.run_query')

self.patched_run_query.return_value = ("some_data", None)
self.patched_custom_queries.CUSTOM_HEALTH_QUERIES = ""

def _setup_mock(self, function_to_patch):
patcher = mock.patch(function_to_patch)
patched_function = patcher.start()
self.addCleanup(patcher.stop)
return patched_function

def test_update_health_status_sets_correct_keys(self):
current_health = redis_connection.get('data_sources:health')
self.assertEqual(None, current_health)

DATA_SOURCE = self.factory.create_data_source()
QUERY_SUCCESS = "SELECT 1"
QUERY_FAIL = "SELECT meep"
SOME_DATA_FAIL = {"a": "b", "foo": "bar", "status": "FAIL"}
SOME_DATA_SUCCESS = {"a": "b", "foo": "bar", "status": "SUCCESS"}
update_health_status(str(DATA_SOURCE.id), DATA_SOURCE.name, QUERY_FAIL, SOME_DATA_FAIL)
update_health_status(str(DATA_SOURCE.id), DATA_SOURCE.name, QUERY_SUCCESS, SOME_DATA_SUCCESS)

'''
The expected format of the cached health data is:
{
"<data_source_id>": {
"metadata": "<data_source_name>",
"queries": {
"<query_text>": {...},
"<query_text>": {...},
"<query_text>": {...},
...
}
},
...
}
'''
current_health = json.loads(redis_connection.get('data_sources:health'))

# There is 1 data source.
self.assertEqual(1, len(current_health.keys()))
self.assertEqual(DATA_SOURCE.id, int(current_health.keys()[0]))

# The data source has "metadata", "queries" and "status" keys.
ds_id = str(DATA_SOURCE.id)
self.assertEqual(3, len(current_health[ds_id].keys()))
self.assertTrue("metadata" in current_health[ds_id].keys())
self.assertTrue("queries" in current_health[ds_id].keys())
self.assertTrue("status" in current_health[ds_id].keys())

# There are two queries with correct data
self.assertEqual(2, len(current_health[ds_id]["queries"]))
self.assertTrue(QUERY_SUCCESS in current_health[ds_id]["queries"].keys())
self.assertTrue(QUERY_FAIL in current_health[ds_id]["queries"].keys())
self.assertEqual(SOME_DATA_FAIL, current_health[ds_id]["queries"][QUERY_FAIL])
self.assertEqual(SOME_DATA_SUCCESS, current_health[ds_id]["queries"][QUERY_SUCCESS])
self.assertEqual(SOME_DATA_FAIL["status"], current_health[ds_id]["status"])

def test_health_status_success(self):
data_sources = []
for i in range(5):
data_sources.append(self.factory.create_data_source())

health_status()

# Status is updated for each of the 5 data sources
self.assertEqual(self.patched_updated_health_status.call_count, 5)

# The data source name and id is correctly passed in the last call of update_health_status()
args, kwargs = self.patched_updated_health_status.call_args
self.assertEqual(str(data_sources[-1].id), args[0])
self.assertEqual(data_sources[-1].name, args[1])

# All expected status keys are available.
EXPECTED_KEYS = ["status", "last_run", "last_run_human", "runtime"]
NEW_STATUS = args[3]
new_status_keys = set(NEW_STATUS.keys())
self.assertEqual(set(EXPECTED_KEYS), new_status_keys)

self.assertEqual("SUCCESS", NEW_STATUS["status"])
for key in EXPECTED_KEYS[1:]:
self.assertIsNotNone(NEW_STATUS[key])

def test_health_status_run_query_throws_exception(self):
data_source = self.factory.create_data_source()

def exception_raiser(*args, **kwargs):
raise Exception

self.patched_run_query.side_effect = exception_raiser
health_status()

# Status is updated for the one data source
self.assertEqual(self.patched_updated_health_status.call_count, 1)

# The data source name is correctly passed in the last call of update_health_status()
args, kwargs = self.patched_updated_health_status.call_args
self.assertEqual(str(data_source.id), args[0])
self.assertEqual(data_source.name, args[1])
self.assertEqual(data_source.query_runner.noop_query, args[2])

# All expected status keys are available.
EXPECTED_KEYS = ['status', 'last_run', 'last_run_human', 'runtime']
NEW_STATUS = args[3]
new_status_keys = set(NEW_STATUS.keys())
self.assertEqual(set(EXPECTED_KEYS), new_status_keys)

self.assertEqual('FAIL', NEW_STATUS['status'])
self.assertIsNotNone(NEW_STATUS['last_run'])
self.assertIsNotNone(NEW_STATUS['last_run_human'])
self.assertIsNone(NEW_STATUS['runtime'])

def test_health_status_custom_query(self):
CUSTOM_QUERY = "select * from table"
data_source = self.factory.create_data_source()
self.patched_custom_queries.CUSTOM_HEALTH_QUERIES = {"1": CUSTOM_QUERY}

health_status()

args, kwargs = self.patched_updated_health_status.call_args
self.assertNotEqual(data_source.query_runner.noop_query, args[2])
self.assertEqual(CUSTOM_QUERY, args[2])

0 comments on commit c08de97

Please sign in to comment.