Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GitHub Actions - workflow Connector Integration Tests - add retry #14452

Merged
merged 30 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9f8c8d2
ci_integration_workflow_launcher.py added (raw version)
grubberr Jul 6, 2022
377cc71
UUID added
grubberr Jul 6, 2022
edad06a
fix UUID
grubberr Jul 6, 2022
5ec3b81
ci_integration_workflow_launcher.py updated
grubberr Jul 7, 2022
ddc3923
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Jul 7, 2022
5cd15eb
ORGANIZATION, REPOSITORY
grubberr Jul 7, 2022
206835d
run_id -> run_uuid
grubberr Jul 7, 2022
905fe53
test-command.yml updated
grubberr Jul 7, 2022
a17aa2d
ci_integration_workflow_launcher.sh -> ci_integration_workflow_launch…
grubberr Jul 7, 2022
c75c2a3
UUID regex updated
grubberr Jul 7, 2022
fc6d7d1
RUN_ID_REGEX -> RUN_UUID_REGEX
grubberr Jul 7, 2022
6f01a5f
check_start_aws_runner_failed
grubberr Jul 7, 2022
e53d462
SLEEP = 1200 added
grubberr Jul 7, 2022
f49e53a
scan backward 3 hours max
grubberr Jul 7, 2022
2a5e330
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Jul 7, 2022
d4ad73c
BRANCH="master"
grubberr Jul 7, 2022
dd9d552
"date" added for build_report.py
grubberr Jul 8, 2022
d3a86c2
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Jul 11, 2022
81c6e87
search_workflow_runs -> search_failed_workflow_runs
grubberr Jul 11, 2022
3fc3f7d
time.sleep(1) added
grubberr Jul 11, 2022
b3a66f8
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Jul 12, 2022
a1ab675
check only conclusion="failure"
grubberr Jul 12, 2022
0b69855
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Jul 15, 2022
0283ef2
ci_integration_workflow_launcher.sh - removed
grubberr Jul 15, 2022
9a83e6a
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Jul 22, 2022
98a8435
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Jul 29, 2022
035e91b
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Jul 30, 2022
9307a62
now we can pass integrations as CLI params
grubberr Aug 1, 2022
8cb1ad1
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Aug 1, 2022
cbd535a
Merge branch 'master' into grubberr/14450-connector-integration-tests
grubberr Aug 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/connector_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: '17'
- name: Install dependencies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this isn't worth addressing unless you want to, but there is a python action that I think we already use in some other places

the main upside is version stability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thank you I will check this

run: |
python -m pip install --upgrade pip
pip install requests
- name: Launch Integration Tests
run: ./tools/bin/ci_integration_workflow_launcher.sh
run: python ./tools/bin/ci_integration_workflow_launcher.py
env:
GITHUB_TOKEN: ${{ secrets.SLASH_COMMAND_PAT }}
5 changes: 5 additions & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ on:
comment-id:
description: "The comment-id of the slash command. Used to update the comment with the status."
required: false
uuid:
description: "Custom UUID of workflow run. Used because GitHub dispatches endpoint does not return workflow run id."
required: false

jobs:
find_valid_pat:
Expand All @@ -25,6 +28,8 @@ jobs:
outputs:
pat: ${{ steps.variables.outputs.pat }}
steps:
- name: UUID ${{ github.event.inputs.uuid }}
run: true
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Check PAT rate limits
Expand Down
2 changes: 1 addition & 1 deletion tools/bin/build_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def check_module(connector):
elif connector.startswith("destination"):
SUCCESS_DESTINATION.append(connector)
else:
failed_today = [connector, short_status, last_build["link"]]
failed_today = [connector, short_status, last_build["link"], last_build["date"]]

if len(history) > 1 and history[1]["status"] != "success":
FAILED_2_LAST.append(failed_today)
Expand Down
185 changes: 185 additions & 0 deletions tools/bin/ci_integration_workflow_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#!/usr/bin/env python3
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime
import logging
import os
import re
import subprocess
import sys
import time
import uuid
from urllib.parse import parse_qsl, urljoin, urlparse

import requests

ORGANIZATION = "airbytehq"
REPOSITORY = "airbyte"
LOGGING_FORMAT = "%(asctime)-15s %(levelname)s %(message)s"
API_URL = "https://api.github.com"
BRANCH = "master"
WORKFLOW_PATH = ".github/workflows/test-command.yml"
RUN_UUID_REGEX = re.compile("^UUID ([0-9a-f-]+)$")
SLEEP = 1200


GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
if not GITHUB_TOKEN:
logging.error("GITHUB_TOKEN not set...")
sys.exit(1)


def check_start_aws_runner_failed(jobs):
"""
!!! WARNING !!! WARNING !!! WARNING !!!
!!! WARNING !!! WARNING !!! WARNING !!!
!!! WARNING !!! WARNING !!! WARNING !!!

If workflow {WORKFLOW_PATH} structure will change in future
there is a chance that we would need to update this function too.
"""
return (
len(jobs) >= 2
midavadim marked this conversation as resolved.
Show resolved Hide resolved
and len(jobs[1]["steps"]) >= 3
and jobs[1]["steps"][2]["name"] == "Start AWS Runner"
and jobs[1]["steps"][2]["conclusion"] == "failure"
)


def get_run_uuid(jobs):
"""
This function relies on assumption that the first step of the first job

- name: UUID ${{ github.event.inputs.uuid }}
run: true
"""
if jobs and len(jobs[0]["steps"]) >= 2:
name = jobs[0]["steps"][1]["name"]
m = re.match(RUN_UUID_REGEX, name)
if m:
return m.groups()[0]


def get_response(url_or_path, params=None):
url = urljoin(API_URL, url_or_path)
response = requests.get(url, params=params, headers={"Authorization": "Bearer " + GITHUB_TOKEN})
response.raise_for_status()
return response


def get_response_json(url_or_path, params=None):
response = get_response(url_or_path, params=params)
return response.json()


def get_workflow_id(owner, repo, path):
response_json = get_response_json(f"/repos/{owner}/{repo}/actions/workflows")
for workflow in response_json["workflows"]:
if workflow["path"] == path:
return workflow["id"]


def workflow_dispatch(owner, repo, workflow_id, connector):
run_uuid = str(uuid.uuid4())
url = urljoin(API_URL, f"/repos/{owner}/{repo}/actions/workflows/{workflow_id}/dispatches")
response = requests.post(
url, headers={"Authorization": "Bearer " + GITHUB_TOKEN}, json={"ref": BRANCH, "inputs": {"connector": connector, "uuid": run_uuid}}
)
response.raise_for_status()
return run_uuid


def get_connector_names():
process = subprocess.run(["./gradlew", "integrationTest", "--dry-run"], check=True, capture_output=True, universal_newlines=True)

res = []
for line in process.stdout.splitlines():
parts = line.split(":")
if (
len(parts) >= 4
and parts[1] == "airbyte-integrations"
and parts[2] in ["connectors", "bases"]
and parts[-1] == "integrationTest SKIPPED"
):
res.append(parts[3])
return res


def iter_workflow_runs(owner, repo, per_page=100):
path = f"/repos/{owner}/{repo}/actions/runs"
page = None
while True:
params = {"per_page": per_page}
if page:
params["page"] = page
response = get_response(path, params=params)
response_json = response.json()
for workflow_run in response_json["workflow_runs"]:
yield workflow_run
if "next" not in response.links:
break
page = dict(parse_qsl(urlparse(response.links["next"]["url"]).query))["page"]


def search_failed_workflow_runs(owner, repo, workflow_id, run_uuids):
run_uuids = set(run_uuids)
now = datetime.datetime.utcnow()
res = set()
for workflow_run in iter_workflow_runs(owner, repo):
if not run_uuids:
break

created_at = datetime.datetime.strptime(workflow_run["created_at"], "%Y-%m-%dT%H:%M:%SZ")
period = now - created_at
if period.seconds > 10800:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem with this timer length, but any logic behind the choice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We scan all workflows backwards until we find all needed by uuid
This 3 hours is just additinal protection (no any reason just 3 hours :)

If we did not found all needed workflows by uuid for some reason
just break loop when we starting to reach "old" (older 3 hours ) workflows

break

if workflow_run["workflow_id"] != workflow_id:
continue
if workflow_run["head_branch"] != BRANCH:
continue
if workflow_run["conclusion"] != "failure":
continue

response_json = get_response_json(workflow_run["jobs_url"])
run_uuid = get_run_uuid(response_json["jobs"])
if not run_uuid:
continue

if run_uuid in run_uuids:
run_uuids.remove(run_uuid)
if check_start_aws_runner_failed(response_json["jobs"]):
res.add(run_uuid)
return res


def main():
workflow_id = get_workflow_id(ORGANIZATION, REPOSITORY, WORKFLOW_PATH)
if not workflow_id:
logging.error(f"Cannot find workflow path '{WORKFLOW_PATH}'")
sys.exit(1)

connector_names = get_connector_names()
run_uuid_to_name = {}
for connector_name in connector_names:
run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name)
logging.info(f"Dispatch workflow for connector {connector_name}, UUID: {run_uuid}")
run_uuid_to_name[run_uuid] = connector_name
# to avoid overloading system
time.sleep(1)

logging.info(f"Sleeping {SLEEP} seconds")
time.sleep(SLEEP)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that if a connector takes longer than 20 minutes to run, it won't be retried? Are we OK with that?

is it possible to have this logic in the integration-test workflow (i.e. maybe it could accept a auto_retry_count option or something, and if it fails then it re-triggers itself with one less retry count)? Then we wouldn't need to have this python script doing a long sleep here either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this script I try to restart ONLY workflows which failed on AWS Start step - only this error
If workflows failed on this problem it usually take up to 5 minutes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, got it, so the retries only happen if the CI infra fails to start, i.e. transient connector test failures aren't covered?

(if yes then this seems OK to me, since we're only running this top-level action once per day)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops sorry I saw your question only now
yes retries only happens for "AWS Start" problem and only for this problem


run_uuids = search_failed_workflow_runs(ORGANIZATION, REPOSITORY, workflow_id, run_uuid_to_name.keys())
for run_uuid in run_uuids:
connector_name = run_uuid_to_name[run_uuid]
run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name)
logging.info(f"Re-dispatch workflow for connector {connector_name}, UUID: {run_uuid}")


if __name__ == "__main__":
logging.basicConfig(format=LOGGING_FORMAT, level=logging.INFO)
main()
102 changes: 0 additions & 102 deletions tools/bin/ci_integration_workflow_launcher.sh

This file was deleted.