From 9f8c8d23ed674fc8c10601d65f0b05e01dd48407 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Wed, 6 Jul 2022 17:28:11 +0300 Subject: [PATCH 01/20] ci_integration_workflow_launcher.py added (raw version) Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 tools/bin/ci_integration_workflow_launcher.py diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py new file mode 100644 index 000000000000..e9d6a79eb82d --- /dev/null +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import os +import sys +import requests +import subprocess + +REPO_API = "https://api.github.com/repos/airbytehq/airbyte" +TEST_COMMAND = ".github/workflows/test-command.yml" +MAX_RUNNING_MASTER_WORKFLOWS = 5 + + +GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") +if not GITHUB_TOKEN: + print("GITHUB_TOKEN not set...") + sys.exit(1) + +response = requests.get( + REPO_API + "/actions/workflows", + headers={"Authorization": "Bearer " + GITHUB_TOKEN}) +response.raise_for_status() +response_json = response.json() + +for workflow in response_json["workflows"]: + if workflow["path"] == TEST_COMMAND: + workflow_id = workflow["id"] + +print(workflow_id) + +response = requests.get( + REPO_API + f"/actions/workflows/{workflow_id}/runs?branch=master&status=in_progress", + headers={"Authorization": "Bearer " + GITHUB_TOKEN}) + +response_json = response.json() +total_count = response_json["total_count"] + +print(total_count) + +process = subprocess.run( + ["./gradlew", "integrationTest", "--dry-run"], check=True, capture_output=True, universal_newlines=True) + +def get_connector_names(output): + names = [] + lines = output.splitlines() + for line in lines: + if "integrationTest SKIPPED" in line: + res = line.split(":") + if res[1] == "airbyte-integrations" and res[2] == "connectors": + names.append(res[3]) + return names + + +names = get_connector_names(process.stdout) + +for name in names: + print(REPO_API + f"/actions/workflows/{workflow_id}/dispatches") + """ + response = requests.post( + REPO_API + f"/actions/workflows/{workflow_id}/dispatches", + headers={"Authorization": "Bearer " + GITHUB_TOKEN}, + json={"ref": "master", "inputs": {"connector": name}}) + """ From 377cc710fc01803ed5b72503520b6c62ce0ea82d Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Wed, 6 Jul 2022 18:11:48 +0300 Subject: [PATCH 02/20] UUID added Signed-off-by: Sergey Chvalyuk --- .github/workflows/test-command.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 1b00e6e649a6..25a0cbb4cba5 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -16,6 +16,9 @@ on: comment-id: description: "The comment-id of the slash command. Used to update the comment with the status." required: false + uuid: + description: "UUID" + required: false jobs: find_valid_pat: @@ -25,6 +28,7 @@ jobs: outputs: pat: ${{ steps.variables.outputs.pat }} steps: + - name: UUID ${{ github.event.inputs.uuid }} - name: Checkout Airbyte uses: actions/checkout@v2 - name: Check PAT rate limits From edad06a8dc583bc49d5e38307aff07870afefc98 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Wed, 6 Jul 2022 18:19:02 +0300 Subject: [PATCH 03/20] fix UUID Signed-off-by: Sergey Chvalyuk --- .github/workflows/test-command.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 25a0cbb4cba5..3d6339a03ac9 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -29,6 +29,7 @@ jobs: pat: ${{ steps.variables.outputs.pat }} steps: - name: UUID ${{ github.event.inputs.uuid }} + run: echo UUID ${{ github.event.inputs.uuid }} - name: Checkout Airbyte uses: actions/checkout@v2 - name: Check PAT rate limits From 5ec3b819088071b9a3226e8bb21c3b9c2fef6099 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 12:52:43 +0300 Subject: [PATCH 04/20] ci_integration_workflow_launcher.py updated Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 182 ++++++++++++++---- 1 file changed, 142 insertions(+), 40 deletions(-) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index e9d6a79eb82d..08b4f5edb60a 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -3,63 +3,165 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import datetime +import logging import os +import re +import subprocess import sys +import uuid +from urllib.parse import parse_qsl, urljoin, urlparse + import requests -import subprocess -REPO_API = "https://api.github.com/repos/airbytehq/airbyte" -TEST_COMMAND = ".github/workflows/test-command.yml" -MAX_RUNNING_MASTER_WORKFLOWS = 5 +LOGGING_FORMAT = "%(asctime)-15s %(levelname)s %(message)s" +API_URL = "https://api.github.com" +BRANCH = "grubberr/14450-connector-integration-tests" +WORKFLOW_PATH = ".github/workflows/test-command.yml" +RUN_ID_REGEX = re.compile(r"UUID ([0-9a-f-]+)") GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") if not GITHUB_TOKEN: - print("GITHUB_TOKEN not set...") + logging.error("GITHUB_TOKEN not set...") sys.exit(1) -response = requests.get( - REPO_API + "/actions/workflows", - headers={"Authorization": "Bearer " + GITHUB_TOKEN}) -response.raise_for_status() -response_json = response.json() -for workflow in response_json["workflows"]: - if workflow["path"] == TEST_COMMAND: - workflow_id = workflow["id"] +def get_response(url_or_path, params=None): + url = urljoin(API_URL, url_or_path) + response = requests.get(url, params=params, headers={"Authorization": "Bearer " + GITHUB_TOKEN}) + response.raise_for_status() + return response -print(workflow_id) -response = requests.get( - REPO_API + f"/actions/workflows/{workflow_id}/runs?branch=master&status=in_progress", - headers={"Authorization": "Bearer " + GITHUB_TOKEN}) +def get_response_json(url_or_path, params=None): + response = get_response(url_or_path, params=params) + return response.json() -response_json = response.json() -total_count = response_json["total_count"] -print(total_count) +def get_workflow_id(owner, repo, path): + response_json = get_response_json(f"/repos/{owner}/{repo}/actions/workflows") + for workflow in response_json["workflows"]: + if workflow["path"] == path: + return workflow["id"] -process = subprocess.run( - ["./gradlew", "integrationTest", "--dry-run"], check=True, capture_output=True, universal_newlines=True) -def get_connector_names(output): - names = [] - lines = output.splitlines() - for line in lines: - if "integrationTest SKIPPED" in line: - res = line.split(":") - if res[1] == "airbyte-integrations" and res[2] == "connectors": - names.append(res[3]) - return names +def workflow_dispatch(owner, repo, workflow_id, connector): + run_id = str(uuid.uuid4()) + url = urljoin(API_URL, f"/repos/{owner}/{repo}/actions/workflows/{workflow_id}/dispatches") + response = requests.post( + url, headers={"Authorization": "Bearer " + GITHUB_TOKEN}, json={"ref": BRANCH, "inputs": {"connector": connector, "uuid": run_id}} + ) + response.raise_for_status() + return run_id -names = get_connector_names(process.stdout) +def get_connector_names(): + process = subprocess.run(["./gradlew", "integrationTest", "--dry-run"], check=True, capture_output=True, universal_newlines=True) -for name in names: - print(REPO_API + f"/actions/workflows/{workflow_id}/dispatches") - """ - response = requests.post( - REPO_API + f"/actions/workflows/{workflow_id}/dispatches", - headers={"Authorization": "Bearer " + GITHUB_TOKEN}, - json={"ref": "master", "inputs": {"connector": name}}) - """ + res = [] + for line in process.stdout.splitlines(): + parts = line.split(":") + if ( + len(parts) >= 4 + and parts[1] == "airbyte-integrations" + and parts[2] in ["connectors", "bases"] + and parts[-1] == "integrationTest SKIPPED" + ): + res.append(parts[3]) + return res + + +def iter_workflow_runs(owner, repo, per_page=100): + path = f"/repos/{owner}/{repo}/actions/runs" + page = None + while True: + params = {"per_page": per_page} + if page: + params["page"] = page + response = get_response(path, params=params) + response_json = response.json() + for workflow_run in response_json["workflow_runs"]: + yield workflow_run + if "next" not in response.links: + break + page = dict(parse_qsl(urlparse(response.links["next"]["url"]).query))["page"] + + +def get_job_run_id(jobs): + if jobs and len(jobs[0]["steps"]) >= 2: + return jobs[0]["steps"][1]["name"] + + +def get_job_start_aws(jobs): + if ( + len(jobs) >= 2 + and len(jobs[1]["steps"]) >= 3 + and jobs[1]["steps"][2]["name"] == "Start AWS Runner" + and jobs[1]["steps"][2]["conclusion"] == "failure" + ): + return True + + +def search_workflow_runs(owner, repo, workflow_id, run_ids): + run_ids = set(run_ids) + now = datetime.datetime.utcnow() + res = set() + for workflow_run in iter_workflow_runs(owner, repo): + + if not run_ids: + break + + created_at = datetime.datetime.strptime(workflow_run["created_at"], "%Y-%m-%dT%H:%M:%SZ") + period = now - created_at + if period.days >= 1: + break + + if workflow_run["workflow_id"] != workflow_id: + continue + if workflow_run["head_branch"] != BRANCH: + continue + + response_json = get_response_json(workflow_run["jobs_url"]) + job_run_id_label = get_job_run_id(response_json["jobs"]) + if not job_run_id_label: + continue + + run_id = None + m = re.match(RUN_ID_REGEX, job_run_id_label) + if m: + run_id = m.groups()[0] + + if not run_id: + continue + + if run_id in run_ids: + run_ids.remove(run_id) + if get_job_start_aws(response_json["jobs"]): + res.add(run_id) + return res + + +def main(): + workflow_id = get_workflow_id("airbytehq", "airbyte", WORKFLOW_PATH) + if not workflow_id: + logging.error(f"Cannot find workflow path '{WORKFLOW_PATH}'") + sys.exit(1) + + connector_names = get_connector_names() + run_id_to_name = {} + for connector_name in connector_names: + logging.info(f"Dispatch workflow for connector {connector_name}") + run_id = workflow_dispatch("airbytehq", "airbyte", workflow_id, connector_name) + run_id_to_name[run_id] = connector_name + + res = search_workflow_runs("airbytehq", "airbyte", workflow_id, run_id_to_name.keys()) + for run_id in res: + connector_name = run_id_to_name[run_id] + logging.info(f"Dispatch workflow for connector {connector_name}") + workflow_dispatch("airbytehq", "airbyte", workflow_id, connector_name) + + +if __name__ == "__main__": + logging.basicConfig(format=LOGGING_FORMAT, level=logging.INFO) + main() From 5cd15ebb2b5b10f77edcf97b690834c5b53b16b2 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 13:25:22 +0300 Subject: [PATCH 05/20] ORGANIZATION, REPOSITORY Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index 08b4f5edb60a..28eb74a254a2 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -14,6 +14,8 @@ import requests +ORGANIZATION = "airbytehq" +REPOSITORY = "airbyte" LOGGING_FORMAT = "%(asctime)-15s %(levelname)s %(message)s" API_URL = "https://api.github.com" BRANCH = "grubberr/14450-connector-integration-tests" @@ -143,7 +145,7 @@ def search_workflow_runs(owner, repo, workflow_id, run_ids): def main(): - workflow_id = get_workflow_id("airbytehq", "airbyte", WORKFLOW_PATH) + workflow_id = get_workflow_id(ORGANIZATION, REPOSITORY, WORKFLOW_PATH) if not workflow_id: logging.error(f"Cannot find workflow path '{WORKFLOW_PATH}'") sys.exit(1) @@ -152,14 +154,14 @@ def main(): run_id_to_name = {} for connector_name in connector_names: logging.info(f"Dispatch workflow for connector {connector_name}") - run_id = workflow_dispatch("airbytehq", "airbyte", workflow_id, connector_name) + run_id = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) run_id_to_name[run_id] = connector_name - res = search_workflow_runs("airbytehq", "airbyte", workflow_id, run_id_to_name.keys()) + res = search_workflow_runs(ORGANIZATION, REPOSITORY, workflow_id, run_id_to_name.keys()) for run_id in res: connector_name = run_id_to_name[run_id] logging.info(f"Dispatch workflow for connector {connector_name}") - workflow_dispatch("airbytehq", "airbyte", workflow_id, connector_name) + workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) if __name__ == "__main__": From 206835d90d87246919a819600b78cdb353d5beca Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 13:37:27 +0300 Subject: [PATCH 06/20] run_id -> run_uuid Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index 28eb74a254a2..ef53616365bf 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -49,13 +49,13 @@ def get_workflow_id(owner, repo, path): def workflow_dispatch(owner, repo, workflow_id, connector): - run_id = str(uuid.uuid4()) + run_uuid = str(uuid.uuid4()) url = urljoin(API_URL, f"/repos/{owner}/{repo}/actions/workflows/{workflow_id}/dispatches") response = requests.post( - url, headers={"Authorization": "Bearer " + GITHUB_TOKEN}, json={"ref": BRANCH, "inputs": {"connector": connector, "uuid": run_id}} + url, headers={"Authorization": "Bearer " + GITHUB_TOKEN}, json={"ref": BRANCH, "inputs": {"connector": connector, "uuid": run_uuid}} ) response.raise_for_status() - return run_id + return run_uuid def get_connector_names(): @@ -90,7 +90,7 @@ def iter_workflow_runs(owner, repo, per_page=100): page = dict(parse_qsl(urlparse(response.links["next"]["url"]).query))["page"] -def get_job_run_id(jobs): +def get_job_run_uuid(jobs): if jobs and len(jobs[0]["steps"]) >= 2: return jobs[0]["steps"][1]["name"] @@ -105,13 +105,13 @@ def get_job_start_aws(jobs): return True -def search_workflow_runs(owner, repo, workflow_id, run_ids): - run_ids = set(run_ids) +def search_workflow_runs(owner, repo, workflow_id, run_uuids): + run_uuids = set(run_uuids) now = datetime.datetime.utcnow() res = set() for workflow_run in iter_workflow_runs(owner, repo): - if not run_ids: + if not run_uuids: break created_at = datetime.datetime.strptime(workflow_run["created_at"], "%Y-%m-%dT%H:%M:%SZ") @@ -125,22 +125,22 @@ def search_workflow_runs(owner, repo, workflow_id, run_ids): continue response_json = get_response_json(workflow_run["jobs_url"]) - job_run_id_label = get_job_run_id(response_json["jobs"]) - if not job_run_id_label: + job_run_uuid_label = get_job_run_uuid(response_json["jobs"]) + if not job_run_uuid_label: continue - run_id = None - m = re.match(RUN_ID_REGEX, job_run_id_label) + run_uuid = None + m = re.match(RUN_ID_REGEX, job_run_uuid_label) if m: - run_id = m.groups()[0] + run_uuid = m.groups()[0] - if not run_id: + if not run_uuid: continue - if run_id in run_ids: - run_ids.remove(run_id) + if run_uuid in run_uuids: + run_uuids.remove(run_uuid) if get_job_start_aws(response_json["jobs"]): - res.add(run_id) + res.add(run_uuid) return res @@ -151,17 +151,17 @@ def main(): sys.exit(1) connector_names = get_connector_names() - run_id_to_name = {} + run_uuid_to_name = {} for connector_name in connector_names: - logging.info(f"Dispatch workflow for connector {connector_name}") - run_id = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) - run_id_to_name[run_id] = connector_name - - res = search_workflow_runs(ORGANIZATION, REPOSITORY, workflow_id, run_id_to_name.keys()) - for run_id in res: - connector_name = run_id_to_name[run_id] - logging.info(f"Dispatch workflow for connector {connector_name}") - workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) + run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) + logging.info(f"Dispatch workflow for connector {connector_name}, UUID: {run_uuid}") + run_uuid_to_name[run_uuid] = connector_name + + res = search_workflow_runs(ORGANIZATION, REPOSITORY, workflow_id, run_uuid_to_name.keys()) + for run_uuid in res: + connector_name = run_uuid_to_name[run_uuid] + run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) + logging.info(f"Re-dispatch workflow for connector {connector_name}, UUID: {run_uuid}") if __name__ == "__main__": From 905fe5398e0f84943d9e2acd18d2bb24127642d1 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 13:55:15 +0300 Subject: [PATCH 07/20] test-command.yml updated Signed-off-by: Sergey Chvalyuk --- .github/workflows/test-command.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 1f378cbe7ca2..eb0679ac4218 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -17,7 +17,7 @@ on: description: "The comment-id of the slash command. Used to update the comment with the status." required: false uuid: - description: "UUID" + description: "Custom UUID of Workflow run. Used because GitHub dispatches endpoint does not return workflow run id." required: false jobs: @@ -29,7 +29,7 @@ jobs: pat: ${{ steps.variables.outputs.pat }} steps: - name: UUID ${{ github.event.inputs.uuid }} - run: echo UUID ${{ github.event.inputs.uuid }} + run: true - name: Checkout Airbyte uses: actions/checkout@v2 - name: Check PAT rate limits From a17aa2dd9f9268d561b907ce9010653fa1287478 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 14:05:00 +0300 Subject: [PATCH 08/20] ci_integration_workflow_launcher.sh -> ci_integration_workflow_launcher.py Signed-off-by: Sergey Chvalyuk --- .github/workflows/connector_integration_tests.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/connector_integration_tests.yml b/.github/workflows/connector_integration_tests.yml index 2546b22af787..7768673664f1 100644 --- a/.github/workflows/connector_integration_tests.yml +++ b/.github/workflows/connector_integration_tests.yml @@ -18,7 +18,11 @@ jobs: uses: actions/setup-java@v1 with: java-version: '17' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install requests - name: Launch Integration Tests - run: ./tools/bin/ci_integration_workflow_launcher.sh + run: python ./tools/bin/ci_integration_workflow_launcher.py env: GITHUB_TOKEN: ${{ secrets.SLASH_COMMAND_PAT }} From c75c2a3eb739ef196181e0745cb3ec858620a40d Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 14:16:43 +0300 Subject: [PATCH 09/20] UUID regex updated Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index ef53616365bf..8e07d98ed23f 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -20,7 +20,7 @@ API_URL = "https://api.github.com" BRANCH = "grubberr/14450-connector-integration-tests" WORKFLOW_PATH = ".github/workflows/test-command.yml" -RUN_ID_REGEX = re.compile(r"UUID ([0-9a-f-]+)") +RUN_ID_REGEX = re.compile("^UUID ([0-9a-f-]+)$") GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") From fc6d7d1a857f82be0d4345d68b8dad70cd5daaaa Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 14:17:33 +0300 Subject: [PATCH 10/20] RUN_ID_REGEX -> RUN_UUID_REGEX Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index 8e07d98ed23f..d9b2651682ae 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -20,7 +20,7 @@ API_URL = "https://api.github.com" BRANCH = "grubberr/14450-connector-integration-tests" WORKFLOW_PATH = ".github/workflows/test-command.yml" -RUN_ID_REGEX = re.compile("^UUID ([0-9a-f-]+)$") +RUN_UUID_REGEX = re.compile("^UUID ([0-9a-f-]+)$") GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") @@ -130,7 +130,7 @@ def search_workflow_runs(owner, repo, workflow_id, run_uuids): continue run_uuid = None - m = re.match(RUN_ID_REGEX, job_run_uuid_label) + m = re.match(RUN_UUID_REGEX, job_run_uuid_label) if m: run_uuid = m.groups()[0] From 6f01a5f633ba8376adbd09d95842d8ece7c3014d Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 17:35:37 +0300 Subject: [PATCH 11/20] check_start_aws_runner_failed Signed-off-by: Sergey Chvalyuk --- .github/workflows/test-command.yml | 2 +- tools/bin/ci_integration_workflow_launcher.py | 58 +++++++++++-------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index eb0679ac4218..ca948abfc0fa 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -17,7 +17,7 @@ on: description: "The comment-id of the slash command. Used to update the comment with the status." required: false uuid: - description: "Custom UUID of Workflow run. Used because GitHub dispatches endpoint does not return workflow run id." + description: "Custom UUID of workflow run. Used because GitHub dispatches endpoint does not return workflow run id." required: false jobs: diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index d9b2651682ae..b4bf8b182015 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -29,6 +29,37 @@ sys.exit(1) +def check_start_aws_runner_failed(jobs): + """ + !!! WARNING !!! WARNING !!! WARNING !!! + !!! WARNING !!! WARNING !!! WARNING !!! + !!! WARNING !!! WARNING !!! WARNING !!! + + If workflow {WORKFLOW_PATH} structure will change in future + there is a chance that we would need to update this function too. + """ + return ( + len(jobs) >= 2 + and len(jobs[1]["steps"]) >= 3 + and jobs[1]["steps"][2]["name"] == "Start AWS Runner" + and jobs[1]["steps"][2]["conclusion"] == "failure" + ) + + +def get_run_uuid(jobs): + """ + This function relies on assumption that the first step of the first job + + - name: UUID ${{ github.event.inputs.uuid }} + run: true + """ + if jobs and len(jobs[0]["steps"]) >= 2: + name = jobs[0]["steps"][1]["name"] + m = re.match(RUN_UUID_REGEX, name) + if m: + return m.groups()[0] + + def get_response(url_or_path, params=None): url = urljoin(API_URL, url_or_path) response = requests.get(url, params=params, headers={"Authorization": "Bearer " + GITHUB_TOKEN}) @@ -90,21 +121,6 @@ def iter_workflow_runs(owner, repo, per_page=100): page = dict(parse_qsl(urlparse(response.links["next"]["url"]).query))["page"] -def get_job_run_uuid(jobs): - if jobs and len(jobs[0]["steps"]) >= 2: - return jobs[0]["steps"][1]["name"] - - -def get_job_start_aws(jobs): - if ( - len(jobs) >= 2 - and len(jobs[1]["steps"]) >= 3 - and jobs[1]["steps"][2]["name"] == "Start AWS Runner" - and jobs[1]["steps"][2]["conclusion"] == "failure" - ): - return True - - def search_workflow_runs(owner, repo, workflow_id, run_uuids): run_uuids = set(run_uuids) now = datetime.datetime.utcnow() @@ -125,21 +141,13 @@ def search_workflow_runs(owner, repo, workflow_id, run_uuids): continue response_json = get_response_json(workflow_run["jobs_url"]) - job_run_uuid_label = get_job_run_uuid(response_json["jobs"]) - if not job_run_uuid_label: - continue - - run_uuid = None - m = re.match(RUN_UUID_REGEX, job_run_uuid_label) - if m: - run_uuid = m.groups()[0] - + run_uuid = get_run_uuid(response_json["jobs"]) if not run_uuid: continue if run_uuid in run_uuids: run_uuids.remove(run_uuid) - if get_job_start_aws(response_json["jobs"]): + if check_start_aws_runner_failed(response_json["jobs"]): res.add(run_uuid) return res From e53d4626cd88320685190e6363030d084908c056 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 18:09:47 +0300 Subject: [PATCH 12/20] SLEEP = 1200 added Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index b4bf8b182015..86127f2f2ea6 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -9,6 +9,7 @@ import re import subprocess import sys +import time import uuid from urllib.parse import parse_qsl, urljoin, urlparse @@ -21,6 +22,7 @@ BRANCH = "grubberr/14450-connector-integration-tests" WORKFLOW_PATH = ".github/workflows/test-command.yml" RUN_UUID_REGEX = re.compile("^UUID ([0-9a-f-]+)$") +SLEEP = 1200 GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") @@ -165,8 +167,11 @@ def main(): logging.info(f"Dispatch workflow for connector {connector_name}, UUID: {run_uuid}") run_uuid_to_name[run_uuid] = connector_name - res = search_workflow_runs(ORGANIZATION, REPOSITORY, workflow_id, run_uuid_to_name.keys()) - for run_uuid in res: + logging.info(f"Sleeping {SLEEP} seconds") + time.sleep(SLEEP) + + run_uuids = search_workflow_runs(ORGANIZATION, REPOSITORY, workflow_id, run_uuid_to_name.keys()) + for run_uuid in run_uuids: connector_name = run_uuid_to_name[run_uuid] run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) logging.info(f"Re-dispatch workflow for connector {connector_name}, UUID: {run_uuid}") From f49e53a32b96efd955c820cf43d70dc7a5e29c23 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 18:28:06 +0300 Subject: [PATCH 13/20] scan backward 3 hours max Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index 86127f2f2ea6..156462c53a9a 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -128,13 +128,12 @@ def search_workflow_runs(owner, repo, workflow_id, run_uuids): now = datetime.datetime.utcnow() res = set() for workflow_run in iter_workflow_runs(owner, repo): - if not run_uuids: break created_at = datetime.datetime.strptime(workflow_run["created_at"], "%Y-%m-%dT%H:%M:%SZ") period = now - created_at - if period.days >= 1: + if period.seconds > 10800: break if workflow_run["workflow_id"] != workflow_id: From d4ad73c2a62f7ef54bb4a9dc838aa2aff8557fd0 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Thu, 7 Jul 2022 18:59:14 +0300 Subject: [PATCH 14/20] BRANCH="master" Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index 156462c53a9a..f3b0a55747eb 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -19,7 +19,7 @@ REPOSITORY = "airbyte" LOGGING_FORMAT = "%(asctime)-15s %(levelname)s %(message)s" API_URL = "https://api.github.com" -BRANCH = "grubberr/14450-connector-integration-tests" +BRANCH = "master" WORKFLOW_PATH = ".github/workflows/test-command.yml" RUN_UUID_REGEX = re.compile("^UUID ([0-9a-f-]+)$") SLEEP = 1200 From dd9d5524ba95f101d9d1c4b804ed70085ee51b5b Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Fri, 8 Jul 2022 08:49:28 +0300 Subject: [PATCH 15/20] "date" added for build_report.py Signed-off-by: Sergey Chvalyuk --- tools/bin/build_report.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/bin/build_report.py b/tools/bin/build_report.py index 96043badb77c..64963feb082e 100644 --- a/tools/bin/build_report.py +++ b/tools/bin/build_report.py @@ -92,7 +92,7 @@ def check_module(connector): elif connector.startswith("destination"): SUCCESS_DESTINATION.append(connector) else: - failed_today = [connector, short_status, last_build["link"]] + failed_today = [connector, short_status, last_build["link"], last_build["date"]] if len(history) > 1 and history[1]["status"] != "success": FAILED_2_LAST.append(failed_today) From 81c6e87ee8dac59e31b315e4136eb904a5ac9807 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 11 Jul 2022 18:17:36 +0300 Subject: [PATCH 16/20] search_workflow_runs -> search_failed_workflow_runs Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index f3b0a55747eb..fd7330e7caa9 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -123,7 +123,7 @@ def iter_workflow_runs(owner, repo, per_page=100): page = dict(parse_qsl(urlparse(response.links["next"]["url"]).query))["page"] -def search_workflow_runs(owner, repo, workflow_id, run_uuids): +def search_failed_workflow_runs(owner, repo, workflow_id, run_uuids): run_uuids = set(run_uuids) now = datetime.datetime.utcnow() res = set() @@ -169,7 +169,7 @@ def main(): logging.info(f"Sleeping {SLEEP} seconds") time.sleep(SLEEP) - run_uuids = search_workflow_runs(ORGANIZATION, REPOSITORY, workflow_id, run_uuid_to_name.keys()) + run_uuids = search_failed_workflow_runs(ORGANIZATION, REPOSITORY, workflow_id, run_uuid_to_name.keys()) for run_uuid in run_uuids: connector_name = run_uuid_to_name[run_uuid] run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) From 3fc3f7df1a600bb4325b13bd2d6feb846eb15c64 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 11 Jul 2022 18:32:23 +0300 Subject: [PATCH 17/20] time.sleep(1) added Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index fd7330e7caa9..2a2fb3910475 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -165,6 +165,8 @@ def main(): run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) logging.info(f"Dispatch workflow for connector {connector_name}, UUID: {run_uuid}") run_uuid_to_name[run_uuid] = connector_name + # to avoid overloading system + time.sleep(1) logging.info(f"Sleeping {SLEEP} seconds") time.sleep(SLEEP) From a1ab675406723ebd1dc4c2a15d572d7ec5780a2e Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Tue, 12 Jul 2022 14:07:10 +0300 Subject: [PATCH 18/20] check only conclusion="failure" Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index 2a2fb3910475..2e4f03709585 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -140,6 +140,8 @@ def search_failed_workflow_runs(owner, repo, workflow_id, run_uuids): continue if workflow_run["head_branch"] != BRANCH: continue + if workflow_run["conclusion"] != "failure": + continue response_json = get_response_json(workflow_run["jobs_url"]) run_uuid = get_run_uuid(response_json["jobs"]) From 0283ef2ad638a01096c29a0865eb781ac34a8f78 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Fri, 15 Jul 2022 18:02:06 +0300 Subject: [PATCH 19/20] ci_integration_workflow_launcher.sh - removed Signed-off-by: Sergey Chvalyuk --- tools/bin/ci_integration_workflow_launcher.sh | 102 ------------------ 1 file changed, 102 deletions(-) delete mode 100755 tools/bin/ci_integration_workflow_launcher.sh diff --git a/tools/bin/ci_integration_workflow_launcher.sh b/tools/bin/ci_integration_workflow_launcher.sh deleted file mode 100755 index 8678761c46dc..000000000000 --- a/tools/bin/ci_integration_workflow_launcher.sh +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env bash -# launches integration test workflows for master builds - -# Programmer note: testing this file is easy, -# set runtime variables inline -# GITHUB_TOKEN=$YOUR_TOKEN_HERE ./ci_integration_workflow_launcher.sh - -set -o errexit # -f exit for any command failure -set -o nounset # -u exit if a variable is not set -# text color escape codes (please note \033 == \e but OSX doesn't respect the \e) -blue_text='\033[94m' -red_text='\033[31m' -default_text='\033[39m' - -# set -x/xtrace' uses a Sony PS4 for more info -PS4="$blue_text""$BASH_SOURCE:$LINENO ""$default_text" -# set -o xtrace - - -if test -z "$GITHUB_TOKEN"; then - echo "GITHUB_TOKEN not set..." - exit 1 -fi - -repo_api=https://api.github.com/repos/airbytehq/airbyte -# workflow_path is hardcoded in a query because escaping strings is hard - -# --------- Get all workflows -workflow_ids_curl_response=$( - curl --silent \ - --show-error \ - --header "Authorization: Bearer $GITHUB_TOKEN" \ - --request GET "$repo_api/actions/workflows" -) -echo -e "$blue_text""\$workflow_ids_curl_response is \n\n$workflow_ids_curl_response\n\n""$default_text" -echo -e "$blue_text""Running jq on \$workflow_ids_curl_response""$default_text" -echo -e "$blue_text""jq '.workflows[] | select(.path==.github/workflows/test-command.yml) | .id'""$default_text" - -workflow_id=$(echo $workflow_ids_curl_response | \ - jq '.workflows[] | select(.path==".github/workflows/test-command.yml") | .id') - -# We expect a unique response, 2 responses is too much -workflows_with_matching_paths=$(echo $workflow_id | wc -l) -echo -e "$blue_text""jq returned: $workflow_id""$default_text" - -if test "$workflows_with_matching_paths" -ne "1" ; then - echo -e "\n\n$red_text""Unexpected number of workflows found != 1 for .github/workflows/test-command.yml""$default_text" - echo -e "\n\n$red_text""\$workflows_with_matching_paths = $workflows_with_matching_paths""$default_text" - exit 1 -fi - -# --------- Ensure no more than 5 concurrent tests happen - -max_running_master_workflows=5 - -running_master_workflows_curl_response=$( - curl \ - --silent \ - --show-error \ - --header "Authorization: Bearer $GITHUB_TOKEN" \ - --request GET "$repo_api/actions/workflows/$workflow_id/runs?branch=master&status=in_progress") - -echo -e "$blue_text""\$running_master_workflows_curl_response is \n\n$running_master_workflows_curl_response\n\n""$default_text" -echo -e "$blue_text""Running jq on response jq .total_count""$default_text" - -running_master_workflows=$(echo "$running_master_workflows_curl_response" | jq .total_count) -echo -e "$blue_text""JQ Found \"$running_master_workflows\" running""$default_text" - - -if test "$running_master_workflows" -gt "$max_running_master_workflows"; then - echo -e "$red_text""More than $max_running_master_workflows integration tests workflows running on master.""$default_text" - echo -e "$red_text""Skipping launching workflows. If you want this test run use manual steps!""$default_text" - exit 0 -else - echo -e "$blue_text""Running ./gradlew integrationTest --dry-run""$default_text" -fi - -# --------- Use gradle to find tests to fire - -./gradlew integrationTest --dry-run -gradle_test_list_result=$? -# Tell epople if Gradle Fails -if test $gradle_test_list_result -ne 0; then - echo -e "$red_text""Gradle FAILED to build! Try './gradlew integrationTest --dry-run' in your branch""$default_text" -else - echo -e "$blue_text""Gradle dry run SUCCESS!""$default_text" -fi - -# --------- Fire tests -connectors=$(./gradlew integrationTest --dry-run | grep 'integrationTest SKIPPED' | cut -d: -f 4 | sort | uniq) -for connector in $connectors; do - echo -e "$blue_text""Issuing GH action request for connector $connector...""$default_text" - curl \ - --silent \ - --show-error \ - --header "Accept: application/vnd.github.v3+json" \ - --header "Authorization: Bearer $GITHUB_TOKEN" \ - --request POST "$repo_api/actions/workflows/$workflow_id/dispatches" \ - --data "{\"ref\":\"master\", \"inputs\": { \"connector\": \"$connector\"}" -done - -echo "If you are reading this the file has finished executing" From 9307a6295a0ebbcc22f3374e832acfd5a1353a4a Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 1 Aug 2022 12:41:31 +0300 Subject: [PATCH 20/20] now we can pass integrations as CLI params Signed-off-by: Sergey Chvalyuk --- .../workflows/connector_integration_tests.yml | 4 +- tools/bin/ci_integration_workflow_launcher.py | 69 ++++++++++++++++--- 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/.github/workflows/connector_integration_tests.yml b/.github/workflows/connector_integration_tests.yml index 7768673664f1..00a5a16207f1 100644 --- a/.github/workflows/connector_integration_tests.yml +++ b/.github/workflows/connector_integration_tests.yml @@ -21,8 +21,8 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install requests + pip install PyYAML requests - name: Launch Integration Tests - run: python ./tools/bin/ci_integration_workflow_launcher.py + run: python ./tools/bin/ci_integration_workflow_launcher.py base-normalization source:beta source:GA destination:beta destination:GA env: GITHUB_TOKEN: ${{ secrets.SLASH_COMMAND_PAT }} diff --git a/tools/bin/ci_integration_workflow_launcher.py b/tools/bin/ci_integration_workflow_launcher.py index 2e4f03709585..28ada3125dea 100644 --- a/tools/bin/ci_integration_workflow_launcher.py +++ b/tools/bin/ci_integration_workflow_launcher.py @@ -11,9 +11,11 @@ import sys import time import uuid +from functools import lru_cache from urllib.parse import parse_qsl, urljoin, urlparse import requests +import yaml ORGANIZATION = "airbytehq" REPOSITORY = "airbyte" @@ -23,6 +25,9 @@ WORKFLOW_PATH = ".github/workflows/test-command.yml" RUN_UUID_REGEX = re.compile("^UUID ([0-9a-f-]+)$") SLEEP = 1200 +SOURCE_DEFINITIONS = "airbyte-config/init/src/main/resources/seed/source_definitions.yaml" +DESTINATION_DEFINITIONS = "./airbyte-config/init/src/main/resources/seed/destination_definitions.yaml" +STAGES = ["alpha", "beta", "generally_available"] GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") @@ -91,9 +96,9 @@ def workflow_dispatch(owner, repo, workflow_id, connector): return run_uuid -def get_connector_names(): +@lru_cache +def get_gradlew_integrations(): process = subprocess.run(["./gradlew", "integrationTest", "--dry-run"], check=True, capture_output=True, universal_newlines=True) - res = [] for line in process.stdout.splitlines(): parts = line.split(":") @@ -107,6 +112,50 @@ def get_connector_names(): return res +@lru_cache +def get_definitions(definition_type): + assert definition_type in ["source", "destination"] + filename = SOURCE_DEFINITIONS + if definition_type == "destination": + filename = DESTINATION_DEFINITIONS + with open(filename) as fp: + return yaml.safe_load(fp) + + +def normalize_stage(stage): + stage = stage.lower() + if stage == "ga": + stage = "generally_available" + return stage + + +def get_integrations(names): + res = set() + for name in names: + parts = name.split(":") + if len(parts) == 2: + definition_type, stage = parts + stage = normalize_stage(stage) + if stage == "all": + for integration in get_gradlew_integrations(): + if integration.startswith(definition_type + "-"): + res.add(integration) + elif stage in STAGES: + for definition in get_definitions(definition_type): + if definition.get("releaseStage", "alpha") == stage: + res.add(definition["dockerRepository"].partition("/")[2]) + else: + logging.warning(f"unknown stage: '{stage}'") + else: + integration = parts[0] + airbyte_integrations = get_gradlew_integrations() + if integration in airbyte_integrations: + res.add(integration) + else: + logging.warning(f"integration not found: {integration}") + return res + + def iter_workflow_runs(owner, repo, per_page=100): path = f"/repos/{owner}/{repo}/actions/runs" page = None @@ -161,12 +210,12 @@ def main(): logging.error(f"Cannot find workflow path '{WORKFLOW_PATH}'") sys.exit(1) - connector_names = get_connector_names() + integration_names = get_integrations(sys.argv[1:]) run_uuid_to_name = {} - for connector_name in connector_names: - run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) - logging.info(f"Dispatch workflow for connector {connector_name}, UUID: {run_uuid}") - run_uuid_to_name[run_uuid] = connector_name + for integration_name in integration_names: + run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, integration_name) + logging.info(f"Dispatch workflow for connector {integration_name}, UUID: {run_uuid}") + run_uuid_to_name[run_uuid] = integration_name # to avoid overloading system time.sleep(1) @@ -175,9 +224,9 @@ def main(): run_uuids = search_failed_workflow_runs(ORGANIZATION, REPOSITORY, workflow_id, run_uuid_to_name.keys()) for run_uuid in run_uuids: - connector_name = run_uuid_to_name[run_uuid] - run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, connector_name) - logging.info(f"Re-dispatch workflow for connector {connector_name}, UUID: {run_uuid}") + integration_name = run_uuid_to_name[run_uuid] + run_uuid = workflow_dispatch(ORGANIZATION, REPOSITORY, workflow_id, integration_name) + logging.info(f"Re-dispatch workflow for connector {integration_name}, UUID: {run_uuid}") if __name__ == "__main__":