Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log batch job failures to Slack #2561

Merged
merged 9 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions .github/workflows/ingestion-error-reporter-deploy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
name: Ingestion error reporter deploy

on:
push:
branches: [main, '*-stable']
paths:
- '.github/workflows/ingestion-error-reporter-deploy.yml'
- 'ingestion/monitoring/errorLogsToSlack.py'
- 'ingestion/monitoring/pyproject.toml'
- 'ingestion/monitoring/poetry.lock'
# Build whenever a new tag is created.
tags:
- "*"
workflow_dispatch:
branches: [main, '*-stable']
paths:
- '.github/workflows/ingestion-error-reporter-deploy.yml'
- 'ingestion/monitoring/errorLogsToSlack.py'
- 'ingestion/monitoring/pyproject.toml'
- 'ingestion/monitoring/poetry.lock'

jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: Build, tag, and push image to Amazon ECR (latest)
if: ${{ github.ref == 'refs/heads/main' }}
working-directory: ingestion/functions
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: gdh-ingestor-error-reporter
IMAGE_TAG: ${{ github.sha }}
run: |
docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -t $ECR_REGISTRY/$ECR_REPOSITORY -f Dockerfile-clean .
docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
docker push $ECR_REGISTRY/$ECR_REPOSITORY:latest

- name: Build, tag, and push image to Amazon ECR (stable)
if: ${{ endsWith(github.ref, '-stable') }}
working-directory: ingestion/functions
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: gdh-ingestor-error-monitor
IMAGE_TAG: ${{ github.sha }}
run: |
docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -t $ECR_REGISTRY/$ECR_REPOSITORY:stable -f Dockerfile-clean .
docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
docker push $ECR_REGISTRY/$ECR_REPOSITORY:stable
72 changes: 72 additions & 0 deletions ingestion/monitoring/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# `python-base` sets up all our shared environment variables
FROM python:3.10-slim as python-base

ENV PYTHONUNBUFFERED=1 \
# prevents python creating .pyc files
PYTHONDONTWRITEBYTECODE=1 \
\
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
\
# https://python-poetry.org/docs/configuration/#using-environment-variables
POETRY_VERSION=1.1.5 \
# make poetry install to this location
POETRY_HOME="/opt/poetry" \
# make poetry create the virtual environment in the project's root
# it gets named `.venv`
POETRY_VIRTUALENVS_IN_PROJECT=true \
# do not ask any interactive question
POETRY_NO_INTERACTION=1 \
\
# this is where our requirements + virtual environment will live
PYSETUP_PATH="/opt/pysetup" \
VENV_PATH="/opt/pysetup/.venv"

# prepend poetry and venv to path
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"

# `builder-base` stage is used to build deps + create our virtual environment
FROM python-base as builder-base
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
build-essential

# install poetry - respects $POETRY_VERSION & $POETRY_HOME
RUN curl -sSL https://install.python-poetry.org/ | python3 - --version 1.1.13

# copy project requirement files here to ensure they will be cached.
WORKDIR $PYSETUP_PATH
COPY poetry.lock pyproject.toml ./

ENV PATH="${PATH}:/root/.poetry/bin"

# install runtime deps - uses $POETRY_VIRTUALENVS_IN_PROJECT internally
RUN poetry install --no-dev

# `development` image is used during development / testing
FROM python-base as development

RUN apt-get update && apt-get upgrade -y curl \
awscli

WORKDIR $PYSETUP_PATH

# copy in our built poetry + venv
COPY --from=builder-base $POETRY_HOME $POETRY_HOME
COPY --from=builder-base $PYSETUP_PATH $PYSETUP_PATH
ENV PATH="${PATH}:/root/.poetry/bin"

# will become mountpoint of our code
WORKDIR /app

COPY errorLogsToSlack.py poetry.lock pyproject.toml ./

# quicker install as runtime deps are already installed
RUN poetry install

# notice I haven't set the environment variables or args here.
# the slack webhook should be configured in the job definition,
# and the args should be configured at submission time
CMD ["poetry", "run", "python3", "./errorLogsToSlack.py"]
17 changes: 17 additions & 0 deletions ingestion/monitoring/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
# Error monitoring

The `errorLogsToSlack.py` script reads log messages from a given Cloudwatch stream
and posts any errors to Slack. It has three inputs, all passed via the environment:

- `SLACK_WEBHOOK` is the webhook URL to post messages to Slack.
- `INGESTION_LOG_GROUP` is the Cloudwatch log group name.
- `INGESTION_LOG_STREAM` is the Cloudwatch log stream name.

Typically, all would be set up EventBridge in AWS when it's run in Batch.

## To set up for a new instance

1. see https://api.slack.com/messaging/webhooks for details on creating a Slack app and enabling web hooks.
2. change the Slack user IDs in the script to ones that represent users in your workspace (who should get notified on ingestion errors).
3. deploy to Batch

# Data monitoring

Data monitoring scripts, currently there's a script to alert daily about
Expand Down
99 changes: 99 additions & 0 deletions ingestion/monitoring/errorLogsToSlack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import argparse
import json
import logging
import os
import requests
import re
import sys
from time import sleep

import boto3


class SlackHandler(logging.Handler):
def __init__(self, webhook_url, level=logging.NOTSET):
super().__init__(level)
self.slack_url = webhook_url

def emit(self, record):
message_header = {'Content-Type': 'application/json'}
message = {'text': f"[{record.levelname}] {record.message}"}
response = requests.post(url=self.slack_url, data=json.dumps(message), headers=message_header)
if response.status_code == 429 and response['error'] == 'rate_limited':
sleep(response['retry_after'])
elif response.status_code != 200:
raise ValueError(
f"Request to slack returned an error {response.status_code}, the response is:\n{response.text}"
)


def interpret(message):
graham = "<@U011A0TFM7X>"
abhishek = "<@U01F70FAJ6N>"
jim = "<@U01TAHDR4F7>"
engineers = f"{graham} {abhishek} {jim}"
lower = message.lower()
if "'dateRange': {'start':".lower() in lower:
return (logging.INFO, f"BACKFILL INITIATED\n{message}")
if "error" in lower:
return (logging.ERROR, f"PARSER ERROR: {engineers}\n{message}")
if "timed out" in lower:
return (logging.ERROR, f"TIME OUT: {engineers}\n{message}")
if lower.startswith('info:'):
return (logging.INFO, message)
return (logging.WARN, message)

def setup_logger():
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARN)
stdoutHandler = logging.StreamHandler(stream=sys.stdout)
stdoutHandler.setLevel(logging.DEBUG)
logger.addHandler(stdoutHandler)
slackHandler = SlackHandler(os.getenv('SLACK_WEBHOOK'), logging.DEBUG)
logger.addHandler(slackHandler)
return logger

def log_messages(cloudwatch_response, logger):
for message in [e['message'] for e in cloudwatch_response['events']]:
(severity, output) = interpret(message)
logger.log(severity, output)


if __name__ == '__main__':
logger = setup_logger()
parser = argparse.ArgumentParser()
parser.add_argument("group", help="AWS log group name for the failed parser")
parser.add_argument("stream", help="AWS log stream name for the failed parser")
args = parser.parse_args()
logGroup = args.group
logStream = args.stream
if logGroup is None or logStream is None:
logger.critical(f"Cannot get messages from log group {logGroup} and stream {logStream}")
sys.exit(1)
logger.info(f"Output from {logGroup}/{logStream}:")
hasMore = False
oldNext = ''
logClient = boto3.client('logs')
response = logClient.get_log_events(
logGroupName=logGroup,
logStreamName=logStream,
startFromHead=True
)
log_messages(response, logger)
oldNext = response['nextForwardToken']
if oldNext and len(oldNext) > 0:
hasMore = true
while hasMore:
response = logClient.get_log_events(
logGroupName=logGroup,
logStreamName=logStream,
startFromHead=True,
nextToken=oldNext
)
log_messages(response, logger)
newNext = response['nextForwardToken']
if (not newNext) or (newNext == oldNext):
hasMore = False
else:
oldNext = newNext
logger.info(f"End of output from {logGroup}/{logStream}")
Loading