Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log batch job failures to Slack #2561

Merged
merged 9 commits into from
Mar 2, 2022
Merged
94 changes: 94 additions & 0 deletions ingestion/monitoring/errorLogsToSlack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json
import logging
import os
import requests
import re
import sys
from time import sleep

import boto3


class SlackHandler(logging.Handler):
def __init__(self, webhook_url, level=logging.NOTSET):
super().__init__(level)
self.slack_url = webhook_url

def emit(self, record):
message_header = {'Content-Type': 'application/json'}
message = {'text': f"[{record.levelname}] {record.message}"}
response = requests.post(url=self.slack_url, data=json.dumps(message), headers=message_header)
if response.status_code == 429 and response['error'] == 'rate_limited':
sleep(response['retry_after'])
elif response.status_code != 200:
raise ValueError(
f"Request to slack returned an error {response.status_code}, the response is:\n{response.text}"
)


def interpret(message):
graham = "<@U011A0TFM7X>"
abhishek = "<@U01F70FAJ6N>"
jim = "<@U01TAHDR4F7>"
engineers = f"{graham} {abhishek} {jim}"
lower = message.lower()
if "'dateRange': {'start':".lower() in lower:
return (logging.INFO, f"BACKFILL INITIATED\n{message}")
if "error" in lower:
return (logging.ERROR, f"PARSER ERROR: {engineers}\n{message}")
if "timed out" in lower:
return (logging.ERROR, f"TIME OUT: {engineers}\n{message}")
if lower.startswith('info:'):
return (logging.INFO, message)
return (logging.WARN, message)

def setup_logger():
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)
stdoutHandler = logging.StreamHandler(stream=sys.stdout)
stdoutHandler.setLevel(logging.DEBUG)
logger.addHandler(stdoutHandler)
slackHandler = SlackHandler(os.getenv('SLACK_WEBHOOK'), logging.DEBUG)
logger.addHandler(slackHandler)
return logger

def log_messages(cloudwatch_response, logger):
for message in [e['message'] for e in cloudwatch_response['events']]:
(severity, output) = interpret(message)
logger.log(severity, output)


if __name__ == '__main__':
logger = setup_logger()
logGroup = os.getenv('INGESTION_LOG_GROUP')
logStream = os.getenv('INGESTION_LOG_STREAM')
if logGroup is None or logStream is None:
logger.critical(f"Cannot get messages from log group {logGroup} and stream {logStream}")
sys.exit(1)
logger.info(f"Output from {logGroup}/{logStream}:")
hasMore = False
oldNext = ''
logClient = boto3.client('logs')
response = logClient.get_log_events(
logGroupName=logGroup,
logStreamName=logStream,
startFromHead=True
)
log_messages(response, logger)
oldNext = response['nextForwardToken']
if oldNext and len(oldNext) > 0:
hasMore = true
while hasMore:
response = logClient.get_log_events(
logGroupName=logGroup,
logStreamName=logStream,
startFromHead=True,
nextToken=oldNext
)
log_messages(response, logger)
newNext = response['nextForwardToken']
if (not newNext) or (newNext == oldNext):
hasMore = False
else:
oldNext = newNext
logger.info(f"End of output from {logGroup}/{logStream}")
181 changes: 181 additions & 0 deletions ingestion/monitoring/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions ingestion/monitoring/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[tool.poetry]
name = "monitoring"
version = "0.1.0"
description = "Monitoring functions for Global.health ingestion"
authors = ["Global.health <info@global.health>"]
license = "MIT"

[tool.poetry.dependencies]
python = "^3.9"
boto3 = "^1.21.8"
requests = "^2.27.1"

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"