diff --git a/superset/app.py b/superset/app.py index 43089326edc5c..4878a83d6afe2 100644 --- a/superset/app.py +++ b/superset/app.py @@ -144,7 +144,13 @@ def init_views(self) -> None: from superset.datasets.api import DatasetRestApi from superset.queries.api import QueryRestApi from superset.views.access_requests import AccessRequestsModelView - from superset.views.alerts import AlertLogModelView, AlertModelView + from superset.views.alerts import ( + AlertLogModelView, + AlertModelView, + AlertObservationModelView, + ValidatorInlineView, + SQLObserverInlineView, + ) from superset.views.annotations import ( AnnotationLayerModelView, AnnotationModelView, @@ -399,6 +405,9 @@ def init_views(self) -> None: category_label=__("Manage"), icon="fa-exclamation-triangle", ) + appbuilder.add_view_no_menu(SQLObserverInlineView) + appbuilder.add_view_no_menu(ValidatorInlineView) + appbuilder.add_view_no_menu(AlertObservationModelView) appbuilder.add_view_no_menu(AlertLogModelView) # diff --git a/superset/migrations/versions/2e5a0ee25ed4_refractor_alerting.py b/superset/migrations/versions/2e5a0ee25ed4_refractor_alerting.py new file mode 100644 index 0000000000000..98bd8a4f54443 --- /dev/null +++ b/superset/migrations/versions/2e5a0ee25ed4_refractor_alerting.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""refractor_alerting + +Revision ID: 2e5a0ee25ed4 +Revises: f80a3b88324b +Create Date: 2020-08-31 20:30:30.781478 + +""" + +# revision identifiers, used by Alembic. +revision = "2e5a0ee25ed4" +down_revision = "f80a3b88324b" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import mysql + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "alert_validators", + sa.Column("created_on", sa.DateTime(), nullable=True), + sa.Column("changed_on", sa.DateTime(), nullable=True), + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("validator_type", sa.String(length=100), nullable=False), + sa.Column("config", sa.Text(), nullable=True), + sa.Column("created_by_fk", sa.Integer(), nullable=True), + sa.Column("changed_by_fk", sa.Integer(), nullable=True), + sa.Column("alert_id", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"],), + sa.ForeignKeyConstraint(["changed_by_fk"], ["ab_user.id"],), + sa.ForeignKeyConstraint(["created_by_fk"], ["ab_user.id"],), + sa.PrimaryKeyConstraint("id"), + ) + op.create_table( + "sql_observers", + sa.Column("created_on", sa.DateTime(), nullable=True), + sa.Column("changed_on", sa.DateTime(), nullable=True), + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("sql", sa.Text(), nullable=False), + sa.Column("created_by_fk", sa.Integer(), nullable=True), + sa.Column("changed_by_fk", sa.Integer(), nullable=True), + sa.Column("alert_id", sa.Integer(), nullable=False), + sa.Column("database_id", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"],), + sa.ForeignKeyConstraint(["changed_by_fk"], ["ab_user.id"],), + sa.ForeignKeyConstraint(["created_by_fk"], ["ab_user.id"],), + sa.ForeignKeyConstraint(["database_id"], ["dbs.id"],), + sa.PrimaryKeyConstraint("id"), + ) + op.create_table( + "sql_observations", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("dttm", sa.DateTime(), nullable=True), + sa.Column("observer_id", sa.Integer(), nullable=False), + sa.Column("alert_id", sa.Integer(), nullable=True), + sa.Column("value", sa.Float(), nullable=True), + sa.Column("error_msg", sa.String(length=500), nullable=True), + sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"],), + sa.ForeignKeyConstraint(["observer_id"], ["sql_observers.id"],), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_sql_observations_dttm"), "sql_observations", ["dttm"], unique=False + ) + + with op.batch_alter_table("alerts") as batch_op: + batch_op.add_column(sa.Column("changed_by_fk", sa.Integer(), nullable=True)) + batch_op.add_column(sa.Column("changed_on", sa.DateTime(), nullable=True)) + batch_op.add_column(sa.Column("created_by_fk", sa.Integer(), nullable=True)) + batch_op.add_column(sa.Column("created_on", sa.DateTime(), nullable=True)) + batch_op.alter_column( + "crontab", existing_type=mysql.VARCHAR(length=50), nullable=False + ) + batch_op.create_foreign_key( + "alerts_ibfk_3", "ab_user", ["changed_by_fk"], ["id"] + ) + batch_op.create_foreign_key( + "alerts_ibfk_4", "ab_user", ["created_by_fk"], ["id"] + ) + batch_op.drop_column("sql") + batch_op.drop_column("database_id") + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("alerts") as batch_op: + batch_op.add_column( + sa.Column( + "database_id", mysql.INTEGER(), autoincrement=False, nullable=False + ) + ) + batch_op.add_column(sa.Column("sql", mysql.TEXT(), nullable=True)) + batch_op.drop_constraint("alerts_ibfk_3", type_="foreignkey") + batch_op.drop_constraint("alerts_ibfk_4", type_="foreignkey") + batch_op.alter_column( + "crontab", existing_type=mysql.VARCHAR(length=50), nullable=True + ) + batch_op.drop_column("created_on") + batch_op.drop_column("created_by_fk") + batch_op.drop_column("changed_on") + batch_op.drop_column("changed_by_fk") + + op.drop_index(op.f("ix_sql_observations_dttm"), table_name="sql_observations") + op.drop_table("sql_observations") + op.drop_table("sql_observers") + op.drop_table("alert_validators") + # ### end Alembic commands ### diff --git a/superset/models/alerts.py b/superset/models/alerts.py index cbea6571c588c..a62aacc419fe5 100644 --- a/superset/models/alerts.py +++ b/superset/models/alerts.py @@ -15,22 +15,27 @@ # specific language governing permissions and limitations # under the License. """Models for scheduled execution of jobs""" +import textwrap from datetime import datetime +from typing import Any, Optional from flask_appbuilder import Model from sqlalchemy import ( Boolean, Column, DateTime, + Float, ForeignKey, Integer, String, Table, Text, ) -from sqlalchemy.orm import backref, relationship +from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.orm import backref, relationship, RelationshipProperty -from superset import security_manager +from superset import db, security_manager +from superset.models.helpers import AuditMixinNullable metadata = Model.metadata # pylint: disable=no-member @@ -44,23 +49,23 @@ ) -class Alert(Model): +class Alert(Model, AuditMixinNullable): """Schedules for emailing slices / dashboards""" __tablename__ = "alerts" id = Column(Integer, primary_key=True) - label = Column(String(150)) + label = Column(String(150), nullable=False) active = Column(Boolean, default=True, index=True) - crontab = Column(String(50)) - sql = Column(Text) + crontab = Column(String(50), nullable=False) alert_type = Column(String(50)) owners = relationship(security_manager.user_model, secondary=alert_owner) recipients = Column(Text) slack_channel = Column(Text) + # TODO: implement log_retention log_retention = Column(Integer, default=90) grace_period = Column(Integer, default=60 * 60 * 24) @@ -70,13 +75,6 @@ class Alert(Model): dashboard_id = Column(Integer, ForeignKey("dashboards.id")) dashboard = relationship("Dashboard", backref="alert", foreign_keys=[dashboard_id]) - database_id = Column(Integer, ForeignKey("dbs.id"), nullable=False) - database = relationship( - "Database", - foreign_keys=[database_id], - backref=backref("alerts", cascade="all, delete-orphan"), - ) - last_eval_dttm = Column(DateTime, default=datetime.utcnow) last_state = Column(String(10)) @@ -100,3 +98,106 @@ class AlertLog(Model): @property def duration(self) -> int: return (self.dttm_end - self.dttm_start).total_seconds() + + +# TODO: Currently SQLObservation table will constantly grow with no limit, +# add some retention restriction or more to a more scalable db e.g. +# https://github.com/apache/incubator-superset/blob/master/superset/utils/log.py#L32 +class SQLObserver(Model, AuditMixinNullable): + """Runs SQL-based queries for alerts""" + + __tablename__ = "sql_observers" + + id = Column(Integer, primary_key=True) + sql = Column(Text, nullable=False) + + @declared_attr + def alert_id(self) -> int: + return Column(Integer, ForeignKey("alerts.id"), nullable=False) + + @declared_attr + def alert(self) -> RelationshipProperty: + return relationship( + "Alert", + foreign_keys=[self.alert_id], + backref=backref("sql_observer", cascade="all, delete-orphan"), + ) + + @declared_attr + def database_id(self) -> int: + return Column(Integer, ForeignKey("dbs.id"), nullable=False) + + @declared_attr + def database(self) -> RelationshipProperty: + return relationship( + "Database", + foreign_keys=[self.database_id], + backref=backref("sql_observers", cascade="all, delete-orphan"), + ) + + def get_last_observation(self) -> Optional[Any]: + observations = list( + db.session.query(SQLObservation) + .filter_by(observer_id=self.id) + .order_by(SQLObservation.dttm.desc()) + .limit(1) + ) + + if observations: + return observations[0] + + return None + + +class SQLObservation(Model): # pylint: disable=too-few-public-methods + """Keeps track of values retrieved from SQLObservers""" + + __tablename__ = "sql_observations" + + id = Column(Integer, primary_key=True) + dttm = Column(DateTime, default=datetime.utcnow, index=True) + observer_id = Column(Integer, ForeignKey("sql_observers.id"), nullable=False) + observer = relationship( + "SQLObserver", + foreign_keys=[observer_id], + backref=backref("observations", cascade="all, delete-orphan"), + ) + alert_id = Column(Integer, ForeignKey("alerts.id")) + alert = relationship( + "Alert", + foreign_keys=[alert_id], + backref=backref("observations", cascade="all, delete-orphan"), + ) + value = Column(Float) + error_msg = Column(String(500)) + + +class Validator(Model, AuditMixinNullable): + """Used to determine how an alert and its observations should be validated""" + + __tablename__ = "alert_validators" + + id = Column(Integer, primary_key=True) + validator_type = Column(String(100), nullable=False) + config = Column( + Text, + default=textwrap.dedent( + """ + { + + } + """ + ), + ) + + @declared_attr + def alert_id(self) -> int: + return Column(Integer, ForeignKey("alerts.id"), nullable=False) + + @declared_attr + def alert(self) -> RelationshipProperty: + return relationship( + "Alert", + foreign_keys=[self.alert_id], + backref=backref("validators", cascade="all, delete-orphan"), + ) diff --git a/superset/tasks/alerts/__init__.py b/superset/tasks/alerts/__init__.py new file mode 100644 index 0000000000000..fd9417fe5c1e9 --- /dev/null +++ b/superset/tasks/alerts/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/superset/tasks/alerts/observer.py b/superset/tasks/alerts/observer.py new file mode 100644 index 0000000000000..f7c5373fd72da --- /dev/null +++ b/superset/tasks/alerts/observer.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +from datetime import datetime +from typing import Optional + +import pandas as pd + +from superset import db +from superset.models.alerts import Alert, SQLObservation +from superset.sql_parse import ParsedQuery + +logger = logging.getLogger("tasks.email_reports") + + +def observe(alert_id: int) -> Optional[str]: + """ + Runs the SQL query in an alert's SQLObserver and then + stores the result in a SQLObservation. + Returns an error message if the observer value was not valid + """ + + alert = db.session.query(Alert).filter_by(id=alert_id).one() + sql_observer = alert.sql_observer[0] + + value = None + + parsed_query = ParsedQuery(sql_observer.sql) + sql = parsed_query.stripped() + df = sql_observer.database.get_df(sql) + + error_msg = validate_observer_result(df, alert.id, alert.label) + + if not error_msg and df.to_records()[0][1] is not None: + value = float(df.to_records()[0][1]) + + observation = SQLObservation( + observer_id=sql_observer.id, + alert_id=alert_id, + dttm=datetime.utcnow(), + value=value, + error_msg=error_msg, + ) + + db.session.add(observation) + db.session.commit() + + return error_msg + + +def validate_observer_result( + sql_result: pd.DataFrame, alert_id: int, alert_label: str +) -> Optional[str]: + """ + Verifies if a DataFrame SQL query result to see if + it contains a valid value for a SQLObservation. + Returns an error message if the result is invalid. + """ + try: + assert ( + not sql_result.empty + ), f"Observer for alert <{alert_id}:{alert_label}> returned no rows" + + rows = sql_result.to_records() + + assert ( + len(rows) == 1 + ), f"Observer for alert <{alert_id}:{alert_label}> returned more than 1 row" + + assert ( + len(rows[0]) == 2 + ), f"Observer for alert <{alert_id}:{alert_label}> returned more than 1 column" + + if rows[0][1] is None: + return None + + float(rows[0][1]) + + except AssertionError as error: + return str(error) + except (TypeError, ValueError): + return ( + f"Observer for alert <{alert_id}:{alert_label}> returned a non-number value" + ) + + return None diff --git a/superset/tasks/alerts/validator.py b/superset/tasks/alerts/validator.py new file mode 100644 index 0000000000000..56dfad4dd63b0 --- /dev/null +++ b/superset/tasks/alerts/validator.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import enum +import json +from operator import eq, ge, gt, le, lt, ne +from typing import Callable, Optional + +import numpy as np + +from superset.exceptions import SupersetException +from superset.models.alerts import SQLObserver + +OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne} + + +class AlertValidatorType(enum.Enum): + not_null = "not null" + operator = "operator" + + @classmethod + def valid_type(cls, validator_type: str) -> bool: + return any(val_type.value == validator_type for val_type in cls) + + +def check_validator(validator_type: str, config: str) -> None: + if not AlertValidatorType.valid_type(validator_type): + raise SupersetException( + f"Error: {validator_type} is not a valid validator type." + ) + + config_dict = json.loads(config) + + if validator_type == AlertValidatorType.operator.value: + + if not (config_dict.get("op") and config_dict.get("threshold")): + raise SupersetException( + "Error: Operator Validator needs specified operator and threshold " + 'values. Add "op" and "threshold" to config.' + ) + + if not config_dict["op"] in OPERATOR_FUNCTIONS.keys(): + raise SupersetException( + f'Error: {config_dict["op"]} is an invalid operator type. Change ' + f'the "op" value in the config to one of ' + f'["<", "<=", ">", ">=", "==", "!="]' + ) + + if not isinstance(config_dict["threshold"], (int, float)): + raise SupersetException( + f'Error: {config_dict["threshold"]} is an invalid threshold value.' + f' Change the "threshold" value in the config.' + ) + + +def not_null_validator( + observer: SQLObserver, validator_config: str # pylint: disable=unused-argument +) -> bool: + """Returns True if a SQLObserver's recent observation is not NULL""" + + observation = observer.get_last_observation() + # TODO: Validate malformed observations/observations with errors separately + if ( + not observation + or observation.error_msg + or observation.value in (0, None, np.nan) + ): + return False + return True + + +def operator_validator(observer: SQLObserver, validator_config: str) -> bool: + """ + Returns True if a SQLObserver's recent observation is greater than or equal to + the value given in the validator config + """ + + observation = observer.get_last_observation() + if observation and observation.value not in (None, np.nan): + operator = json.loads(validator_config)["op"] + threshold = json.loads(validator_config)["threshold"] + if OPERATOR_FUNCTIONS[operator](observation.value, threshold): + return True + + return False + + +def get_validator_function( + validator_type: str, +) -> Optional[Callable[[SQLObserver, str], bool]]: + """Returns a validation function based on validator_type""" + + alert_validators = { + AlertValidatorType.not_null.value: not_null_validator, + AlertValidatorType.operator.value: operator_validator, + } + if alert_validators.get(validator_type.lower()): + return alert_validators[validator_type.lower()] + + return None diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py index 2969bb6eac7c9..a4a7b5d87be7b 100644 --- a/superset/tasks/schedules.py +++ b/superset/tasks/schedules.py @@ -37,7 +37,6 @@ from urllib.error import URLError # pylint: disable=ungrouped-imports import croniter -import pandas as pd import simplejson as json from celery.app.task import Task from dateutil.tz import tzlocal @@ -52,7 +51,6 @@ from superset import app, db, security_manager, thumbnail_cache from superset.extensions import celery_app, machine_auth_provider_factory from superset.models.alerts import Alert, AlertLog -from superset.models.core import Database from superset.models.dashboard import Dashboard from superset.models.schedules import ( EmailDeliveryType, @@ -61,7 +59,8 @@ SliceEmailReportFormat, ) from superset.models.slice import Slice -from superset.sql_parse import ParsedQuery +from superset.tasks.alerts.observer import observe +from superset.tasks.alerts.validator import get_validator_function from superset.tasks.slack_util import deliver_slack_msg from superset.utils.core import get_email_address_list, send_email_smtp from superset.utils.screenshots import ChartScreenshot, WebDriverProxy @@ -74,7 +73,6 @@ from werkzeug.datastructures import TypeConversionDict from flask_appbuilder.security.sqla.models import User - # Globals config = app.config logger = logging.getLogger("tasks.email_reports") @@ -106,6 +104,7 @@ class ScreenshotData(NamedTuple): class AlertContent(NamedTuple): label: str # alert name sql: str # sql statement for alert + observation_value: str # value from observation that triggered the alert alert_url: str # url to alert details image_data: Optional[ScreenshotData] # data for the alert screenshot @@ -539,15 +538,7 @@ def schedule_alert_query( # pylint: disable=unused-argument return if report_type == ScheduleType.alert: - if recipients or slack_channel: - deliver_alert(schedule.id, recipients, slack_channel) - return - - if run_alert_query( - schedule.id, schedule.database_id, schedule.sql, schedule.label - ): - # deliver_dashboard OR deliver_slice - return + evaluate_alert(schedule.id, schedule.label, recipients, slack_channel) else: raise RuntimeError("Unknown report type") except NoSuchColumnError as column_error: @@ -565,18 +556,35 @@ class AlertState: def deliver_alert( - alert_id: int, recipients: Optional[str] = None, slack_channel: Optional[str] = None + alert_id: int, + recipients: Optional[str] = None, + slack_channel: Optional[str] = None, ) -> None: + """ + Gathers alert information and sends out the alert + to its respective email and slack recipients + """ + alert = db.session.query(Alert).get(alert_id) logging.info("Triggering alert: %s", alert) + + # Set all the values for the alert report + # Alternate values are used in the case of a test alert + # where an alert has no observations yet recipients = recipients or alert.recipients slack_channel = slack_channel or alert.slack_channel + sql = alert.sql_observer[0].sql if alert.sql_observer else "" + observation_value = ( + str(alert.observations[-1].value) if alert.observations else "Value" + ) + # TODO: add sql query results and validator information to alert content if alert.slice: alert_content = AlertContent( alert.label, - alert.sql, + sql, + observation_value, _get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id), _get_slice_screenshot(alert.slice.id), ) @@ -584,7 +592,8 @@ def deliver_alert( # TODO: dashboard delivery! alert_content = AlertContent( alert.label, - alert.sql, + sql, + observation_value, _get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id), None, ) @@ -596,7 +605,7 @@ def deliver_alert( def deliver_email_alert(alert_content: AlertContent, recipients: str) -> None: - # TODO add sql query results to email + """Delivers an email alert to the given email recipients""" subject = f"[Superset] Triggered alert: {alert_content.label}" deliver_as_group = False data = None @@ -613,6 +622,7 @@ def deliver_email_alert(alert_content: AlertContent, recipients: str) -> None: alert_url=alert_content.alert_url, label=alert_content.label, sql=alert_content.sql, + observation_value=alert_content.observation_value, image_url=image_url, ) @@ -620,6 +630,8 @@ def deliver_email_alert(alert_content: AlertContent, recipients: str) -> None: def deliver_slack_alert(alert_content: AlertContent, slack_channel: str) -> None: + """Delivers a slack alert to the given slack channel""" + subject = __("[Alert] %(label)s", label=alert_content.label) image = None @@ -628,6 +640,7 @@ def deliver_slack_alert(alert_content: AlertContent, slack_channel: str) -> None "slack/alert.txt", label=alert_content.label, sql=alert_content.sql, + observation_value=alert_content.observation_value, url=alert_content.image_data.url, alert_url=alert_content.alert_url, ) @@ -637,6 +650,7 @@ def deliver_slack_alert(alert_content: AlertContent, slack_channel: str) -> None "slack/alert_no_screenshot.txt", label=alert_content.label, sql=alert_content.sql, + observation_value=alert_content.observation_value, alert_url=alert_content.alert_url, ) @@ -645,55 +659,48 @@ def deliver_slack_alert(alert_content: AlertContent, slack_channel: str) -> None ) -def run_alert_query( - alert_id: int, database_id: int, sql: str, label: str -) -> Optional[bool]: - """ - Execute alert.sql and return value if any rows are returned - """ - logger.info("Processing alert ID: %i", alert_id) - database = db.session.query(Database).get(database_id) - if not database: - logger.error("Alert database not preset") - return None - - if not sql: - logger.error("Alert SQL not preset") - return None +def evaluate_alert( + alert_id: int, + label: str, + recipients: Optional[str] = None, + slack_channel: Optional[str] = None, +) -> None: + """Processes an alert to see if it should be triggered""" - parsed_query = ParsedQuery(sql) - sql = parsed_query.stripped() + logger.info("Processing alert ID: %i", alert_id) state = None dttm_start = datetime.utcnow() - df = pd.DataFrame() try: - logger.info("Evaluating SQL for alert <%s:%s>", alert_id, label) - df = database.get_df(sql) + logger.info("Querying observers for alert <%s:%s>", alert_id, label) + error_msg = observe(alert_id) + if error_msg: + state = AlertState.ERROR + logging.error(error_msg) except Exception as exc: # pylint: disable=broad-except state = AlertState.ERROR logging.exception(exc) - logging.error("Failed at evaluating alert: %s (%s)", label, alert_id) + logging.error("Failed at query observers for alert: %s (%s)", label, alert_id) dttm_end = datetime.utcnow() - last_eval_dttm = datetime.utcnow() if state != AlertState.ERROR: - if not df.empty: - # Looking for truthy cells - for row in df.to_records(): - if any(row): - state = AlertState.TRIGGER - deliver_alert(alert_id) - break - if not state: + # Don't validate alert on test runs since it may not be triggered + if recipients or slack_channel: + deliver_alert(alert_id, recipients, slack_channel) + state = AlertState.TRIGGER + # Validate during regular workflow and deliver only if triggered + elif validate_observations(alert_id, label): + deliver_alert(alert_id, recipients, slack_channel) + state = AlertState.TRIGGER + else: state = AlertState.PASS db.session.commit() alert = db.session.query(Alert).get(alert_id) if state != AlertState.ERROR: - alert.last_eval_dttm = last_eval_dttm + alert.last_eval_dttm = dttm_end alert.last_state = state alert.logs.append( AlertLog( @@ -705,7 +712,23 @@ def run_alert_query( ) db.session.commit() - return None + +def validate_observations(alert_id: int, label: str) -> bool: + """ + Runs an alert's validators to check if it should be triggered or not + If so, return the name of the validator that returned true + """ + + logger.info("Validating observations for alert <%s:%s>", alert_id, label) + + alert = db.session.query(Alert).get(alert_id) + if alert.validators: + validator = alert.validators[0] + validate = get_validator_function(validator.validator_type) + if validate and validate(alert.sql_observer[0], validator.config): + return True + + return False def next_schedules( diff --git a/superset/templates/email/alert.txt b/superset/templates/email/alert.txt index 0a2c623b91535..50ca6aa9cd491 100644 --- a/superset/templates/email/alert.txt +++ b/superset/templates/email/alert.txt @@ -18,7 +18,8 @@ -->
SQL Statement:
-{{sql}}
+{{sql}}
+SQL Result: {{observation_value}}
Click here or the image below to view the chart related to this alert.
diff --git a/superset/templates/slack/alert.txt b/superset/templates/slack/alert.txt index 2264eea77cedf..80cfaa9c2c1ab 100644 --- a/superset/templates/slack/alert.txt +++ b/superset/templates/slack/alert.txt @@ -17,6 +17,7 @@ under the License. #} *Triggered Alert: {{label}} :redalert:* -SQL Statement:```{{sql}}``` +*SQL* *Statement*:```{{sql}}``` +*SQL* *Result*: {{observation_value}} <{{alert_url}}|View Alert Details> <{{url}}|*Explore in Superset*> diff --git a/superset/templates/slack/alert_no_screenshot.txt b/superset/templates/slack/alert_no_screenshot.txt index 84b74dafcb184..4e31f36201754 100644 --- a/superset/templates/slack/alert_no_screenshot.txt +++ b/superset/templates/slack/alert_no_screenshot.txt @@ -17,5 +17,6 @@ under the License. #} *Triggered Alert: {{label}} :redalert:* -SQL Statement:```{{sql}}``` +*SQL* *Statement*:```{{sql}}``` +*SQL* *Result*: {{observation_value}} <{{alert_url}}|View Alert Details> diff --git a/superset/views/alerts.py b/superset/views/alerts.py index 70573cbff3428..c550cc3e1d07b 100644 --- a/superset/views/alerts.py +++ b/superset/views/alerts.py @@ -23,9 +23,17 @@ from wtforms import BooleanField, Form, StringField from superset.constants import RouteMethod -from superset.models.alerts import Alert, AlertLog +from superset.models.alerts import ( + Alert, + AlertLog, + SQLObservation, + SQLObserver, + Validator, +) from superset.models.schedules import ScheduleType +from superset.tasks.alerts.validator import check_validator from superset.tasks.schedules import schedule_alert_query +from superset.utils import core as utils from superset.utils.core import get_email_address_str, markdown from ..exceptions import SupersetException @@ -47,6 +55,127 @@ class AlertLogModelView( ) +class AlertObservationModelView( + CompactCRUDMixin, SupersetModelView +): # pylint: disable=too-many-ancestors + datamodel = SQLAInterface(SQLObservation) + include_route_methods = {RouteMethod.LIST} | {"show"} + list_title = _("List Observations") + show_title = _("Show Observation") + list_columns = ( + "dttm", + "value", + "error_msg", + ) + label_columns = { + "error_msg": _("Error Message"), + } + + +# TODO: add a button to the form to test if the SQL statment can run with no errors +class SQLObserverInlineView( # pylint: disable=too-many-ancestors + CompactCRUDMixin, SupersetModelView +): + datamodel = SQLAInterface(SQLObserver) + include_route_methods = RouteMethod.RELATED_VIEW_SET | RouteMethod.API_SET + list_title = _("SQL Observers") + show_title = _("Show SQL Observer") + add_title = _("Add SQL Observer") + edit_title = _("Edit SQL Observer") + + edit_columns = [ + "alert", + "database", + "sql", + ] + + add_columns = edit_columns + + list_columns = ["alert.label", "database", "sql"] + + label_columns = { + "alert": _("Alert"), + "database": _("Database"), + "sql": _("SQL"), + } + + description_columns = { + "sql": _( + "A SQL statement that defines whether the alert should get triggered or " + "not. The query is expected to return either NULL or a number value." + ) + } + + def pre_add(self, item: "SQLObserverInlineView") -> None: + if item.alert.sql_observer and item.alert.sql_observer[0].id != item.id: + raise SupersetException("Error: An alert should only have one observer.") + + +class ValidatorInlineView( # pylint: disable=too-many-ancestors + CompactCRUDMixin, SupersetModelView +): + datamodel = SQLAInterface(Validator) + include_route_methods = RouteMethod.RELATED_VIEW_SET | RouteMethod.API_SET + list_title = _("Validators") + show_title = _("Show Validator") + add_title = _("Add Validator") + edit_title = _("Edit Validator") + + edit_columns = [ + "alert", + "validator_type", + "config", + ] + + add_columns = edit_columns + + list_columns = [ + "validator_type", + "alert.label", + ] + + label_columns = { + "validator_type": _("Validator Type"), + "alert": _("Alert"), + } + + description_columns = { + "validator_type": utils.markdown( + "Determines when to trigger alert based off value from SQLObserver query. " + "Alerts will be triggered with these validator types:" + "