diff --git a/airbyte-ci/connectors/pipelines/README.md b/airbyte-ci/connectors/pipelines/README.md index a241cff242d5..1fb5434237a2 100644 --- a/airbyte-ci/connectors/pipelines/README.md +++ b/airbyte-ci/connectors/pipelines/README.md @@ -259,8 +259,8 @@ flowchart TD | Option | Multiple | Default value | Description | | ------------------- | -------- | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `--skip-step/-x` | True | | Skip steps by id e.g. `-x unit -x acceptance` | | `--fail-fast` | False | False | Abort after any tests fail, rather than continuing to run additional tests. Use this setting to confirm a known bug is fixed (or not), or when you only require a pass/fail result. | -| `--fast-tests-only` | True | False | Run unit tests only, skipping integration tests or any tests explicitly tagged as slow. Use this for more frequent checks, when it is not feasible to run the entire test suite. | | `--code-tests-only` | True | False | Skip any tests not directly related to code updates. For instance, metadata checks, version bump checks, changelog verification, etc. Use this setting to help focus on code quality during development. | | `--concurrent-cat` | False | False | Make CAT tests run concurrently using pytest-xdist. Be careful about source or destination API rate limits. | @@ -502,6 +502,7 @@ This command runs the Python tests for a airbyte-ci poetry package. | Version | PR | Description | | ------- | ---------------------------------------------------------- | --------------------------------------------------------------------------------------------------------- | +| 2.11.0 | [#32188](https://github.com/airbytehq/airbyte/pull/32188) | Add -x option to connector test to allow for skipping steps | | 2.10.12 | [#33419](https://github.com/airbytehq/airbyte/pull/33419) | Make ClickPipelineContext handle dagger logging. | | 2.10.11 | [#33497](https://github.com/airbytehq/airbyte/pull/33497) | Consider nested .gitignore rules in format. | | 2.10.10 | [#33449](https://github.com/airbytehq/airbyte/pull/33449) | Add generated metadata models to the default format ignore list. | diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/consts.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/consts.py new file mode 100644 index 000000000000..10e00bc67dad --- /dev/null +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/consts.py @@ -0,0 +1,26 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +from enum import Enum + + +class CONNECTOR_TEST_STEP_ID(str, Enum): + """ + An enum for the different step ids of the connector test pipeline. + """ + + ACCEPTANCE = "acceptance" + BUILD_NORMALIZATION = "build_normalization" + BUILD_TAR = "build_tar" + BUILD = "build" + CHECK_BASE_IMAGE = "check_base_image" + INTEGRATION = "integration" + METADATA_VALIDATION = "metadata_validation" + QA_CHECKS = "qa_checks" + UNIT = "unit" + VERSION_FOLLOW_CHECK = "version_follow_check" + VERSION_INC_CHECK = "version_inc_check" + TEST_ORCHESTRATOR = "test_orchestrator" + DEPLOY_ORCHESTRATOR = "deploy_orchestrator" + + def __str__(self) -> str: + return self.value diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py index 3553cafa8ac1..440801a1759e 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py @@ -6,7 +6,7 @@ from datetime import datetime from types import TracebackType -from typing import Iterable, Optional +from typing import Iterable, List, Optional import yaml from anyio import Path @@ -18,6 +18,7 @@ from pipelines.dagger.actions import secrets from pipelines.helpers.connectors.modifed import ConnectorWithModifiedFiles from pipelines.helpers.github import update_commit_status_check +from pipelines.helpers.run_steps import RunStepOptions from pipelines.helpers.slack import send_message_to_webhook from pipelines.helpers.utils import METADATA_FILE_NAME from pipelines.models.contexts.pipeline_context import PipelineContext @@ -50,8 +51,6 @@ def __init__( reporting_slack_channel: Optional[str] = None, pull_request: PullRequest = None, should_save_report: bool = True, - fail_fast: bool = False, - fast_tests_only: bool = False, code_tests_only: bool = False, use_local_cdk: bool = False, use_host_gradle_dist_tar: bool = False, @@ -61,6 +60,7 @@ def __init__( s3_build_cache_access_key_id: Optional[str] = None, s3_build_cache_secret_key: Optional[str] = None, concurrent_cat: Optional[bool] = False, + run_step_options: RunStepOptions = RunStepOptions(), targeted_platforms: Optional[Iterable[Platform]] = BUILD_PLATFORMS, ): """Initialize a connector context. @@ -80,8 +80,6 @@ def __init__( slack_webhook (Optional[str], optional): The slack webhook to send messages to. Defaults to None. reporting_slack_channel (Optional[str], optional): The slack channel to send messages to. Defaults to None. pull_request (PullRequest, optional): The pull request object if the pipeline was triggered by a pull request. Defaults to None. - fail_fast (bool, optional): Whether to fail fast. Defaults to False. - fast_tests_only (bool, optional): Whether to run only fast tests. Defaults to False. code_tests_only (bool, optional): Whether to ignore non-code tests like QA and metadata checks. Defaults to False. use_host_gradle_dist_tar (bool, optional): Used when developing java connectors with gradle. Defaults to False. enable_report_auto_open (bool, optional): Open HTML report in browser window. Defaults to True. @@ -102,8 +100,6 @@ def __init__( self._updated_secrets_dir = None self.cdk_version = None self.should_save_report = should_save_report - self.fail_fast = fail_fast - self.fast_tests_only = fast_tests_only self.code_tests_only = code_tests_only self.use_local_cdk = use_local_cdk self.use_host_gradle_dist_tar = use_host_gradle_dist_tar @@ -113,6 +109,7 @@ def __init__( self.s3_build_cache_access_key_id = s3_build_cache_access_key_id self.s3_build_cache_secret_key = s3_build_cache_secret_key self.concurrent_cat = concurrent_cat + self._connector_secrets = None self.targeted_platforms = targeted_platforms super().__init__( @@ -131,6 +128,7 @@ def __init__( ci_gcs_credentials=ci_gcs_credentials, ci_git_user=ci_git_user, ci_github_access_token=ci_github_access_token, + run_step_options=run_step_options, enable_report_auto_open=enable_report_auto_open, ) @@ -210,6 +208,11 @@ def docker_hub_password_secret(self) -> Optional[Secret]: return None return self.dagger_client.set_secret("docker_hub_password", self.docker_hub_password) + async def get_connector_secrets(self): + if self._connector_secrets is None: + self._connector_secrets = await secrets.get_connector_secrets(self) + return self._connector_secrets + async def get_connector_dir(self, exclude=None, include=None) -> Directory: """Get the connector under test source code directory. diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py index b858ca839e00..00bdffeabccf 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py @@ -3,15 +3,18 @@ # import sys +from typing import List import asyncclick as click from pipelines import main_logger +from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.connectors.pipeline import run_connectors_pipelines from pipelines.airbyte_ci.connectors.test.pipeline import run_connector_test_pipeline from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand from pipelines.consts import LOCAL_BUILD_PLATFORM, ContextState from pipelines.helpers.github import update_global_commit_status_check_for_tests +from pipelines.helpers.run_steps import RunStepOptions from pipelines.helpers.utils import fail_if_missing_docker_hub_creds @@ -30,13 +33,6 @@ type=bool, is_flag=True, ) -@click.option( - "--fast-tests-only", - help="When enabled, slow tests are skipped.", - default=False, - type=bool, - is_flag=True, -) @click.option( "--concurrent-cat", help="When enabled, the CAT tests will run concurrently. Be careful about rate limits", @@ -44,13 +40,20 @@ type=bool, is_flag=True, ) +@click.option( + "--skip-step", + "-x", + multiple=True, + type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]), + help="Skip a step by name. Can be used multiple times to skip multiple steps.", +) @click.pass_context async def test( ctx: click.Context, code_tests_only: bool, fail_fast: bool, - fast_tests_only: bool, concurrent_cat: bool, + skip_step: List[str], ) -> bool: """Runs a test pipeline for the selected connectors. @@ -70,6 +73,11 @@ async def test( update_global_commit_status_check_for_tests(ctx.obj, "success") return True + run_step_options = RunStepOptions( + fail_fast=fail_fast, + skip_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in skip_step], + ) + connectors_tests_contexts = [ ConnectorContext( pipeline_name=f"Testing connector {connector.technical_name}", @@ -86,8 +94,6 @@ async def test( ci_context=ctx.obj.get("ci_context"), pull_request=ctx.obj.get("pull_request"), ci_gcs_credentials=ctx.obj["ci_gcs_credentials"], - fail_fast=fail_fast, - fast_tests_only=fast_tests_only, code_tests_only=code_tests_only, use_local_cdk=ctx.obj.get("use_local_cdk"), s3_build_cache_access_key_id=ctx.obj.get("s3_build_cache_access_key_id"), @@ -95,10 +101,12 @@ async def test( docker_hub_username=ctx.obj.get("docker_hub_username"), docker_hub_password=ctx.obj.get("docker_hub_password"), concurrent_cat=concurrent_cat, + run_step_options=run_step_options, targeted_platforms=[LOCAL_BUILD_PLATFORM], ) for connector in ctx.obj["selected_connectors_with_modified_files"] ] + try: await run_connectors_pipelines( [connector_context for connector_context in connectors_tests_contexts], diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py index df5a4d28888c..424cc8dd87d0 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py @@ -3,101 +3,68 @@ # """This module groups factory like functions to dispatch tests steps according to the connector under test language.""" -import itertools from typing import List import anyio -import asyncer -from connector_ops.utils import METADATA_FILE_NAME, ConnectorLanguage +from connector_ops.utils import ConnectorLanguage +from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.connectors.reports import ConnectorReport from pipelines.airbyte_ci.connectors.test.steps import java_connectors, python_connectors from pipelines.airbyte_ci.connectors.test.steps.common import QaChecks, VersionFollowsSemverCheck, VersionIncrementCheck from pipelines.airbyte_ci.metadata.pipeline import MetadataValidation -from pipelines.models.steps import StepResult +from pipelines.helpers.run_steps import StepToRun, run_steps LANGUAGE_MAPPING = { - "run_all_tests": { - ConnectorLanguage.PYTHON: python_connectors.run_all_tests, - ConnectorLanguage.LOW_CODE: python_connectors.run_all_tests, - ConnectorLanguage.JAVA: java_connectors.run_all_tests, - } + "get_test_steps": { + ConnectorLanguage.PYTHON: python_connectors.get_test_steps, + ConnectorLanguage.LOW_CODE: python_connectors.get_test_steps, + ConnectorLanguage.JAVA: java_connectors.get_test_steps, + }, } -async def run_metadata_validation(context: ConnectorContext) -> List[StepResult]: - """Run the metadata validation on a connector. - Args: - context (ConnectorContext): The current connector context. - - Returns: - List[StepResult]: The results of the metadata validation steps. - """ - return [await MetadataValidation(context).run()] - - -async def run_version_checks(context: ConnectorContext) -> List[StepResult]: - """Run the version checks on a connector. - - Args: - context (ConnectorContext): The current connector context. - - Returns: - List[StepResult]: The results of the version checks steps. - """ - return [await VersionFollowsSemverCheck(context).run(), await VersionIncrementCheck(context).run()] - - -async def run_qa_checks(context: ConnectorContext) -> List[StepResult]: - """Run the QA checks on a connector. +def get_test_steps(context: ConnectorContext) -> List[StepToRun]: + """Get all the tests steps according to the connector language. Args: context (ConnectorContext): The current connector context. Returns: - List[StepResult]: The results of the QA checks steps. + List[StepResult]: The list of tests steps. """ - return [await QaChecks(context).run()] - - -async def run_all_tests(context: ConnectorContext) -> List[StepResult]: - """Run all the tests steps according to the connector language. - - Args: - context (ConnectorContext): The current connector context. - - Returns: - List[StepResult]: The results of the tests steps. - """ - if _run_all_tests := LANGUAGE_MAPPING["run_all_tests"].get(context.connector.language): - return await _run_all_tests(context) + if _get_test_steps := LANGUAGE_MAPPING["get_test_steps"].get(context.connector.language): + return _get_test_steps(context) else: context.logger.warning(f"No tests defined for connector language {context.connector.language}!") return [] -async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport: - """Run a test pipeline for a single connector. +async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore): + """ + Compute the steps to run for a connector test pipeline. + """ - A visual DAG can be found on the README.md file of the pipelines modules. + steps_to_run = get_test_steps(context) - Args: - context (ConnectorContext): The initialized connector context. + if not context.code_tests_only: + steps_to_run += [ + [ + StepToRun(id=CONNECTOR_TEST_STEP_ID.METADATA_VALIDATION, step=MetadataValidation(context)), + StepToRun(id=CONNECTOR_TEST_STEP_ID.VERSION_FOLLOW_CHECK, step=VersionFollowsSemverCheck(context)), + StepToRun(id=CONNECTOR_TEST_STEP_ID.VERSION_INC_CHECK, step=VersionIncrementCheck(context)), + StepToRun(id=CONNECTOR_TEST_STEP_ID.QA_CHECKS, step=QaChecks(context)), + ] + ] - Returns: - ConnectorReport: The test reports holding tests results. - """ async with semaphore: async with context: - async with asyncer.create_task_group() as task_group: - tasks = [task_group.soonify(run_all_tests)(context)] - if not context.code_tests_only: - tasks += [ - task_group.soonify(run_metadata_validation)(context), - task_group.soonify(run_version_checks)(context), - task_group.soonify(run_qa_checks)(context), - ] - results = list(itertools.chain(*(task.value for task in tasks))) + result_dict = await run_steps( + runnables=steps_to_run, + options=context.run_step_options, + ) + + results = result_dict.values() context.report = ConnectorReport(context, steps_results=results, name="TEST RESULTS") return context.report diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py index 8d384d6bbd60..d11658e06786 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py @@ -287,7 +287,7 @@ async def _build_connector_acceptance_test(self, connector_under_test_container: .with_new_file("/tmp/container_id.txt", str(connector_container_id)) .with_workdir("/test_input") .with_mounted_directory("/test_input", test_input) - .with_(await secrets.mounted_connector_secrets(self.context, "/test_input/secrets")) + .with_(await secrets.mounted_connector_secrets(self.context, self.CONTAINER_SECRETS_DIRECTORY)) ) if "_EXPERIMENTAL_DAGGER_RUNNER_HOST" in os.environ: self.context.logger.info("Using experimental dagger runner host to run CAT with dagger-in-dagger") diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py index 5e8babe25223..1598b6fe9c41 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py @@ -4,10 +4,9 @@ """This module groups steps made to run tests for a specific Java connector given a test context.""" -from typing import List, Optional +from typing import Callable, List, Optional import anyio -import asyncer from dagger import Directory, File, QueryError from pipelines.airbyte_ci.connectors.build_image.steps.java_connectors import ( BuildConnectorDistributionTar, @@ -15,12 +14,14 @@ dist_tar_directory_path, ) from pipelines.airbyte_ci.connectors.build_image.steps.normalization import BuildOrPullNormalization +from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests from pipelines.airbyte_ci.steps.gradle import GradleTask from pipelines.consts import LOCAL_BUILD_PLATFORM from pipelines.dagger.actions import secrets from pipelines.dagger.actions.system import docker +from pipelines.helpers.run_steps import RESULTS_DICT, StepToRun from pipelines.helpers.utils import export_container_to_tarball from pipelines.models.steps import StepResult, StepStatus @@ -65,66 +66,94 @@ class UnitTests(GradleTask): bind_to_docker_host = True -async def run_all_tests(context: ConnectorContext) -> List[StepResult]: - """Run all tests for a Java connectors. - - - Build the normalization image if the connector supports it. - - Run unit tests with Gradle. - - Build connector image with Gradle. - - Run integration and acceptance test in parallel using the built connector and normalization images. - - Args: - context (ConnectorContext): The current connector context. - - Returns: - List[StepResult]: The results of all the tests steps. +def _create_integration_step_args_factory(context: ConnectorContext) -> Callable[[RESULTS_DICT], dict]: + """ + Create a function that can process the args for the integration step. """ - context.connector_secrets = await secrets.get_connector_secrets(context) - step_results = [] - - build_distribution_tar_result = await BuildConnectorDistributionTar(context).run() - step_results.append(build_distribution_tar_result) - if build_distribution_tar_result.status is StepStatus.FAILURE: - return step_results - dist_tar_dir = build_distribution_tar_result.output_artifact.directory(dist_tar_directory_path(context)) + async def _create_integration_step_args(results: RESULTS_DICT): - async def run_docker_build_dependent_steps(dist_tar_dir: Directory) -> List[StepResult]: - step_results = [] - build_connector_image_results = await BuildConnectorImages(context).run(dist_tar_dir) - step_results.append(build_connector_image_results) - if build_connector_image_results.status is StepStatus.FAILURE: - return step_results + connector_container = results["build"].output_artifact[LOCAL_BUILD_PLATFORM] + connector_image_tar_file, _ = await export_container_to_tarball(context, connector_container, LOCAL_BUILD_PLATFORM) if context.connector.supports_normalization: - normalization_image = f"{context.connector.normalization_repository}:dev" - context.logger.info(f"This connector supports normalization: will build {normalization_image}.") - build_normalization_results = await BuildOrPullNormalization(context, normalization_image, LOCAL_BUILD_PLATFORM).run() + tar_file_name = f"{context.connector.normalization_repository}_{context.git_revision}.tar" + build_normalization_results = results["build_normalization"] + normalization_container = build_normalization_results.output_artifact normalization_tar_file, _ = await export_container_to_tarball( - context, - normalization_container, - LOCAL_BUILD_PLATFORM, - tar_file_name=f"{context.connector.normalization_repository}_{context.git_revision}.tar", + context, normalization_container, LOCAL_BUILD_PLATFORM, tar_file_name=tar_file_name ) - step_results.append(build_normalization_results) else: normalization_tar_file = None - connector_container = build_connector_image_results.output_artifact[LOCAL_BUILD_PLATFORM] - connector_image_tar_file, _ = await export_container_to_tarball(context, connector_container, LOCAL_BUILD_PLATFORM) + return {"connector_tar_file": connector_image_tar_file, "normalization_tar_file": normalization_tar_file} - async with asyncer.create_task_group() as docker_build_dependent_group: - soon_integration_tests_results = docker_build_dependent_group.soonify(IntegrationTests(context).run)( - connector_tar_file=connector_image_tar_file, normalization_tar_file=normalization_tar_file - ) - soon_cat_results = docker_build_dependent_group.soonify(AcceptanceTests(context, True).run)(connector_container) + return _create_integration_step_args + + +def _get_normalization_steps(context: ConnectorContext) -> List[StepToRun]: + normalization_image = f"{context.connector.normalization_repository}:dev" + context.logger.info(f"This connector supports normalization: will build {normalization_image}.") + normalization_steps = [ + StepToRun( + id=CONNECTOR_TEST_STEP_ID.BUILD_NORMALIZATION, + step=BuildOrPullNormalization(context, normalization_image, LOCAL_BUILD_PLATFORM), + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], + ) + ] + + return normalization_steps - step_results += [soon_cat_results.value, soon_integration_tests_results.value] - return step_results - async with asyncer.create_task_group() as test_task_group: - soon_unit_tests_result = test_task_group.soonify(UnitTests(context).run)() - soon_docker_build_dependent_steps_results = test_task_group.soonify(run_docker_build_dependent_steps)(dist_tar_dir) +def _get_acceptance_test_steps(context: ConnectorContext) -> List[StepToRun]: + """ + Generate the steps to run the acceptance tests for a Java connector. + """ + # Run tests in parallel + return [ + StepToRun( + id=CONNECTOR_TEST_STEP_ID.INTEGRATION, + step=IntegrationTests(context), + args=_create_integration_step_args_factory(context), + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], + ), + StepToRun( + id=CONNECTOR_TEST_STEP_ID.ACCEPTANCE, + step=AcceptanceTests(context, True), + args=lambda results: { + "connector_under_test_container": results[CONNECTOR_TEST_STEP_ID.BUILD].output_artifact[LOCAL_BUILD_PLATFORM] + }, + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], + ), + ] + + +def get_test_steps(context: ConnectorContext) -> List[StepToRun]: + """ + Get all the tests steps for a Java connector. + """ - return step_results + [soon_unit_tests_result.value] + soon_docker_build_dependent_steps_results.value + steps = [ + [StepToRun(id=CONNECTOR_TEST_STEP_ID.BUILD_TAR, step=BuildConnectorDistributionTar(context))], + [StepToRun(id=CONNECTOR_TEST_STEP_ID.UNIT, step=UnitTests(context), depends_on=[CONNECTOR_TEST_STEP_ID.BUILD_TAR])], + [ + StepToRun( + id=CONNECTOR_TEST_STEP_ID.BUILD, + step=BuildConnectorImages(context), + args=lambda results: { + "dist_dir": results[CONNECTOR_TEST_STEP_ID.BUILD_TAR].output_artifact.directory(dist_tar_directory_path(context)) + }, + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD_TAR], + ), + ], + ] + + if context.connector.supports_normalization: + normalization_steps = _get_normalization_steps(context) + steps.append(normalization_steps) + + acceptance_test_steps = _get_acceptance_test_steps(context) + steps.append(acceptance_test_steps) + + return steps diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py index 769eb8575146..a97950b80d21 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py @@ -5,17 +5,20 @@ """This module groups steps made to run tests for a specific Python connector given a test context.""" from abc import ABC, abstractmethod -from typing import Callable, Iterable, List, Tuple +from dataclasses import dataclass +from typing import Callable, Dict, Iterable, List, Tuple, Union import asyncer import pipelines.dagger.actions.python.common import pipelines.dagger.actions.system.docker from dagger import Container, File from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages +from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, CheckBaseImageIsUsed from pipelines.consts import LOCAL_BUILD_PLATFORM from pipelines.dagger.actions import secrets +from pipelines.helpers.run_steps import StepToRun from pipelines.models.steps import Step, StepResult, StepStatus @@ -193,35 +196,37 @@ class IntegrationTests(PytestStep): bind_to_docker_host = True -async def run_all_tests(context: ConnectorContext) -> List[StepResult]: - """Run all tests for a Python connector. - - Args: - context (ConnectorContext): The current connector context. - - Returns: - List[StepResult]: The results of all the steps that ran or were skipped. +def get_test_steps(context: ConnectorContext) -> List[StepToRun]: """ - step_results = [] - build_connector_image_results = await BuildConnectorImages(context).run() - if build_connector_image_results.status is StepStatus.FAILURE: - return [build_connector_image_results] - step_results.append(build_connector_image_results) - - connector_container = build_connector_image_results.output_artifact[LOCAL_BUILD_PLATFORM] - - context.connector_secrets = await secrets.get_connector_secrets(context) - - unit_test_results = await UnitTests(context).run(connector_container) - - if unit_test_results.status is StepStatus.FAILURE: - return step_results + [unit_test_results] - step_results.append(unit_test_results) - async with asyncer.create_task_group() as task_group: - tasks = [ - task_group.soonify(IntegrationTests(context).run)(connector_container), - task_group.soonify(AcceptanceTests(context, context.concurrent_cat).run)(connector_container), - task_group.soonify(CheckBaseImageIsUsed(context).run)(), - ] - - return step_results + [task.value for task in tasks] + Get all the tests steps for a Python connector. + """ + return [ + [StepToRun(id=CONNECTOR_TEST_STEP_ID.BUILD, step=BuildConnectorImages(context))], + [ + StepToRun( + id=CONNECTOR_TEST_STEP_ID.UNIT, + step=UnitTests(context), + args=lambda results: {"connector_under_test": results[CONNECTOR_TEST_STEP_ID.BUILD].output_artifact[LOCAL_BUILD_PLATFORM]}, + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], + ) + ], + [ + StepToRun( + id=CONNECTOR_TEST_STEP_ID.INTEGRATION, + step=IntegrationTests(context), + args=lambda results: {"connector_under_test": results[CONNECTOR_TEST_STEP_ID.BUILD].output_artifact[LOCAL_BUILD_PLATFORM]}, + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], + ), + StepToRun( + id=CONNECTOR_TEST_STEP_ID.ACCEPTANCE, + step=AcceptanceTests(context, context.concurrent_cat), + args=lambda results: { + "connector_under_test_container": results[CONNECTOR_TEST_STEP_ID.BUILD].output_artifact[LOCAL_BUILD_PLATFORM] + }, + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], + ), + StepToRun( + id=CONNECTOR_TEST_STEP_ID.CHECK_BASE_IMAGE, step=CheckBaseImageIsUsed(context), depends_on=[CONNECTOR_TEST_STEP_ID.BUILD] + ), + ], + ] diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py index a1a5843bdfc6..d906414438ce 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py @@ -6,13 +6,14 @@ from typing import Optional import dagger +from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID from pipelines.airbyte_ci.connectors.context import ConnectorContext, PipelineContext from pipelines.airbyte_ci.steps.docker import SimpleDockerStep from pipelines.airbyte_ci.steps.poetry import PoetryRunStep from pipelines.consts import DOCS_DIRECTORY_ROOT_PATH, INTERNAL_TOOL_PATHS from pipelines.dagger.actions.python.common import with_pip_packages from pipelines.dagger.containers.python import with_python_base -from pipelines.helpers.steps import run_steps +from pipelines.helpers.run_steps import StepToRun, run_steps from pipelines.helpers.utils import DAGGER_CONFIG, get_secret_host_variable from pipelines.models.reports import Report from pipelines.models.steps import MountPath, Step, StepResult @@ -177,7 +178,17 @@ async def run_metadata_orchestrator_deploy_pipeline( metadata_pipeline_context.dagger_client = dagger_client.pipeline(metadata_pipeline_context.pipeline_name) async with metadata_pipeline_context: - steps = [TestOrchestrator(context=metadata_pipeline_context), DeployOrchestrator(context=metadata_pipeline_context)] + steps = [ + StepToRun( + id=CONNECTOR_TEST_STEP_ID.TEST_ORCHESTRATOR, + step=TestOrchestrator(context=metadata_pipeline_context), + ), + StepToRun( + id=CONNECTOR_TEST_STEP_ID.DEPLOY_ORCHESTRATOR, + step=DeployOrchestrator(context=metadata_pipeline_context), + depends_on=[CONNECTOR_TEST_STEP_ID.TEST_ORCHESTRATOR], + ), + ] steps_results = await run_steps(steps) metadata_pipeline_context.report = Report( pipeline_context=metadata_pipeline_context, steps_results=steps_results, name="METADATA ORCHESTRATOR DEPLOY RESULTS" diff --git a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py index 9160aadf5a57..484f5639fc02 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py +++ b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py @@ -133,7 +133,7 @@ async def get_connector_secrets(context: ConnectorContext) -> dict[str, Secret]: return connector_secrets -async def mounted_connector_secrets(context: PipelineContext, secret_directory_path: str) -> Callable[[Container], Container]: +async def mounted_connector_secrets(context: ConnectorContext, secret_directory_path: str) -> Callable[[Container], Container]: # By default, mount the secrets properly as dagger secret files. # # This will cause the contents of these files to be scrubbed from the logs. This scrubbing comes at the cost of @@ -156,11 +156,12 @@ async def mounted_connector_secrets(context: PipelineContext, secret_directory_p # [3] https://github.com/dagger/dagger/blob/v0.6.4/cmd/shim/main.go#L294 # [4] https://github.com/airbytehq/airbyte/issues/30394 # + connector_secrets = await context.get_connector_secrets() if context.is_local: # Special case for local development. # Query dagger for the contents of the secrets and mount these strings as files in the container. contents = {} - for secret_file_name, secret in context.connector_secrets.items(): + for secret_file_name, secret in connector_secrets.items(): contents[secret_file_name] = await secret.plaintext() def with_secrets_mounted_as_regular_files(container: Container) -> Container: @@ -173,7 +174,7 @@ def with_secrets_mounted_as_regular_files(container: Container) -> Container: def with_secrets_mounted_as_dagger_secrets(container: Container) -> Container: container = container.with_exec(["mkdir", "-p", secret_directory_path], skip_entrypoint=True) - for secret_file_name, secret in context.connector_secrets.items(): + for secret_file_name, secret in connector_secrets.items(): container = container.with_mounted_secret(f"{secret_directory_path}/{secret_file_name}", secret) return container diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py new file mode 100644 index 000000000000..b9cfd7e3c1da --- /dev/null +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py @@ -0,0 +1,248 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +"""The actions package is made to declare reusable pipeline components.""" + +from __future__ import annotations + +import inspect +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Tuple, Union + +import anyio +import asyncer +from pipelines import main_logger +from pipelines.models.steps import StepResult, StepStatus + +RESULTS_DICT = Dict[str, StepResult] +ARGS_TYPE = Union[Dict, Callable[[RESULTS_DICT], Dict], Awaitable[Dict]] + +if TYPE_CHECKING: + from pipelines.models.steps import Step, StepResult + + +@dataclass +class RunStepOptions: + """Options for the run_step function.""" + + fail_fast: bool = True + skip_steps: List[str] = field(default_factory=list) + log_step_tree: bool = True + concurrency: int = 10 + + +@dataclass(frozen=True) +class StepToRun: + """ + A class to wrap a Step with its id and args. + + Used to coordinate the execution of multiple steps inside a pipeline. + """ + + id: str + step: Step + args: ARGS_TYPE = field(default_factory=dict) + depends_on: List[str] = field(default_factory=list) + + +STEP_TREE = List[StepToRun | List[StepToRun]] + + +async def evaluate_run_args(args: ARGS_TYPE, results: RESULTS_DICT) -> Dict: + """ + Evaluate the args of a StepToRun using the results of previous steps. + """ + if inspect.iscoroutinefunction(args): + return await args(results) + elif callable(args): + return args(results) + elif isinstance(args, dict): + return args + + raise TypeError(f"Unexpected args type: {type(args)}") + + +def _skip_remaining_steps(remaining_steps: STEP_TREE) -> bool: + """ + Skip all remaining steps. + """ + skipped_results = {} + for runnable_step in remaining_steps: + if isinstance(runnable_step, StepToRun): + skipped_results[runnable_step.id] = runnable_step.step.skip() + elif isinstance(runnable_step, list): + nested_skipped_results = _skip_remaining_steps(runnable_step) + skipped_results = {**skipped_results, **nested_skipped_results} + else: + raise Exception(f"Unexpected step type: {type(runnable_step)}") + + return skipped_results + + +def _step_dependencies_succeeded(depends_on: List[str], results: RESULTS_DICT) -> bool: + """ + Check if all dependencies of a step have succeeded. + """ + main_logger.info(f"Checking if dependencies {depends_on} have succeeded") + return all(results.get(step_id) and results.get(step_id).status is StepStatus.SUCCESS for step_id in depends_on) + + +def _filter_skipped_steps(steps_to_evaluate: STEP_TREE, skip_steps: List[str], results: RESULTS_DICT) -> Tuple[STEP_TREE, RESULTS_DICT]: + """ + Filter out steps that should be skipped. + + Either because they are in the skip list or because one of their dependencies failed. + """ + steps_to_run = [] + for step_to_eval in steps_to_evaluate: + + # ignore nested steps + if isinstance(step_to_eval, list): + steps_to_run.append(step_to_eval) + continue + + # skip step if its id is in the skip list + if step_to_eval.id in skip_steps: + main_logger.info(f"Skipping step {step_to_eval.id}") + results[step_to_eval.id] = step_to_eval.step.skip("Skipped by user") + + # skip step if a dependency failed + elif not _step_dependencies_succeeded(step_to_eval.depends_on, results): + main_logger.info( + f"Skipping step {step_to_eval.id} because one of the dependencies have not been met: {step_to_eval.depends_on}" + ) + results[step_to_eval.id] = step_to_eval.step.skip("Skipped because a dependency was not met") + + else: + steps_to_run.append(step_to_eval) + + return steps_to_run, results + + +def _get_next_step_group(steps: STEP_TREE) -> Tuple[STEP_TREE, STEP_TREE]: + """ + Get the next group of steps to run concurrently. + """ + if not steps: + return [], [] + + if isinstance(steps[0], list): + return steps[0], steps[1:] + else: + # Termination case: if the next step is not a list that means we have reached the max depth + return steps, [] + + +def _log_step_tree(step_tree: STEP_TREE, options: RunStepOptions, depth: int = 0): + """ + Log the step tree to the console. + + e.g. + Step tree + - step1 + - step2 + - step3 + - step4 (skip) + - step5 + - step6 + """ + indent = " " + for steps in step_tree: + if isinstance(steps, list): + _log_step_tree(steps, options, depth + 1) + else: + if steps.id in options.skip_steps: + main_logger.info(f"{indent * depth}- {steps.id} (skip)") + else: + main_logger.info(f"{indent * depth}- {steps.id}") + + +async def run_steps( + runnables: STEP_TREE, + results: RESULTS_DICT = {}, + options: RunStepOptions = RunStepOptions(), +) -> RESULTS_DICT: + """Run multiple steps sequentially, or in parallel if steps are wrapped into a sublist. + + Examples + -------- + >>> from pipelines.models.steps import Step, StepResult, StepStatus + >>> class TestStep(Step): + ... async def _run(self) -> StepResult: + ... return StepResult(self, StepStatus.SUCCESS) + >>> steps = [ + ... StepToRun(id="step1", step=TestStep()), + ... [ + ... StepToRun(id="step2", step=TestStep()), + ... StepToRun(id="step3", step=TestStep()), + ... ], + ... StepToRun(id="step4", step=TestStep()), + ... ] + >>> results = await run_steps(steps) + >>> results["step1"].status + + >>> results["step2"].status + + >>> results["step3"].status + + >>> results["step4"].status + + + + Args: + runnables (List[StepToRun]): List of steps to run. + results (RESULTS_DICT, optional): Dictionary of step results, used for recursion. + + Returns: + RESULTS_DICT: Dictionary of step results. + """ + # If there are no steps to run, return the results + if not runnables: + return results + + # Log the step tree + if options.log_step_tree: + main_logger.info(f"STEP TREE: {runnables}") + _log_step_tree(runnables, options) + options.log_step_tree = False + + # If any of the previous steps failed, skip the remaining steps + if options.fail_fast and any(result.status is StepStatus.FAILURE for result in results.values()): + skipped_results = _skip_remaining_steps(runnables) + return {**results, **skipped_results} + + # Pop the next step to run + steps_to_evaluate, remaining_steps = _get_next_step_group(runnables) + + # Remove any skipped steps + steps_to_run, results = _filter_skipped_steps(steps_to_evaluate, options.skip_steps, results) + + # Run all steps in list concurrently + semaphore = anyio.Semaphore(options.concurrency) + async with semaphore: + async with asyncer.create_task_group() as task_group: + tasks = [] + for step_to_run in steps_to_run: + # if the step to run is a list, run it in parallel + if isinstance(step_to_run, list): + tasks.append(task_group.soonify(run_steps)(step_to_run, results, options)) + else: + step_args = await evaluate_run_args(step_to_run.args, results) + main_logger.info(f"QUEUING STEP {step_to_run.id}") + tasks.append(task_group.soonify(step_to_run.step.run)(**step_args)) + + # Apply new results + new_results = {} + for i, task in enumerate(tasks): + step_to_run = steps_to_run[i] + if isinstance(step_to_run, list): + new_results = {**new_results, **task.value} + else: + new_results[step_to_run.id] = task.value + + return await run_steps( + runnables=remaining_steps, + results={**results, **new_results}, + options=options, + ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/steps.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/steps.py deleted file mode 100644 index 95ad77c0b996..000000000000 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/steps.py +++ /dev/null @@ -1,62 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -"""The actions package is made to declare reusable pipeline components.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING, List, Tuple, Union - -import asyncer -from pipelines.models.steps import Step, StepStatus - -if TYPE_CHECKING: - from pipelines.models.steps import StepResult - - -async def run_steps( - steps_and_run_args: List[Union[Step, Tuple[Step, Tuple]] | List[Union[Step, Tuple[Step, Tuple]]]], results: List[StepResult] = [] -) -> List[StepResult]: - """Run multiple steps sequentially, or in parallel if steps are wrapped into a sublist. - - Args: - steps_and_run_args (List[Union[Step, Tuple[Step, Tuple]] | List[Union[Step, Tuple[Step, Tuple]]]]): List of steps to run, if steps are wrapped in a sublist they will be executed in parallel. run function arguments can be passed as a tuple along the Step instance. - results (List[StepResult], optional): List of step results, used for recursion. - - Returns: - List[StepResult]: List of step results. - """ - # If there are no steps to run, return the results - if not steps_and_run_args: - return results - - # If any of the previous steps failed, skip the remaining steps - if any(result.status is StepStatus.FAILURE for result in results): - skipped_results = [] - for step_and_run_args in steps_and_run_args: - if isinstance(step_and_run_args, Tuple): - skipped_results.append(step_and_run_args[0].skip()) - else: - skipped_results.append(step_and_run_args.skip()) - return results + skipped_results - - # Pop the next step to run - steps_to_run, remaining_steps = steps_and_run_args[0], steps_and_run_args[1:] - - # wrap the step in a list if it is not already (allows for parallel steps) - if not isinstance(steps_to_run, list): - steps_to_run = [steps_to_run] - - async with asyncer.create_task_group() as task_group: - tasks = [] - for step in steps_to_run: - if isinstance(step, Step): - tasks.append(task_group.soonify(step.run)()) - elif isinstance(step, Tuple) and isinstance(step[0], Step) and isinstance(step[1], Tuple): - step, run_args = step - tasks.append(task_group.soonify(step.run)(*run_args)) - - new_results = [task.value for task in tasks] - - return await run_steps(remaining_steps, results + new_results) diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py index 29fefe8d3679..c22f54132bd8 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py @@ -17,6 +17,7 @@ from pipelines.consts import CIContext, ContextState from pipelines.helpers.gcs import sanitize_gcs_credentials from pipelines.helpers.github import update_commit_status_check +from pipelines.helpers.run_steps import RunStepOptions from pipelines.helpers.slack import send_message_to_webhook from pipelines.helpers.utils import AIRBYTE_REPO_URL from pipelines.models.reports import Report @@ -61,6 +62,7 @@ def __init__( ci_gcs_credentials: Optional[str] = None, ci_git_user: Optional[str] = None, ci_github_access_token: Optional[str] = None, + run_step_options: RunStepOptions = RunStepOptions(), enable_report_auto_open: bool = True, ): """Initialize a pipeline context. @@ -104,6 +106,7 @@ def __init__( self.started_at = None self.stopped_at = None self.secrets_to_mask = [] + self.run_step_options = run_step_options self.enable_report_auto_open = enable_report_auto_open update_commit_status_check(**self.github_commit_status) diff --git a/airbyte-ci/connectors/pipelines/pyproject.toml b/airbyte-ci/connectors/pipelines/pyproject.toml index b3015060cb5b..f8a5fc5a33e7 100644 --- a/airbyte-ci/connectors/pipelines/pyproject.toml +++ b/airbyte-ci/connectors/pipelines/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipelines" -version = "2.10.12" +version = "2.11.0" description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines" authors = ["Airbyte "] diff --git a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_run_steps.py b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_run_steps.py new file mode 100644 index 000000000000..744445320840 --- /dev/null +++ b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_run_steps.py @@ -0,0 +1,338 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import time + +import anyio +import pytest +from pipelines.helpers.run_steps import RunStepOptions, StepToRun, run_steps +from pipelines.models.contexts.pipeline_context import PipelineContext +from pipelines.models.steps import Step, StepResult, StepStatus + +test_context = PipelineContext(pipeline_name="test", is_local=True, git_branch="test", git_revision="test") + + +class TestStep(Step): + title = "Test Step" + + async def _run(self, result_status=StepStatus.SUCCESS) -> StepResult: + return StepResult(self, result_status) + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "desc, steps, expected_results, options", + [ + ( + "All consecutive steps succeed", + [ + [StepToRun(id="step1", step=TestStep(test_context))], + [StepToRun(id="step2", step=TestStep(test_context))], + [StepToRun(id="step3", step=TestStep(test_context))], + [StepToRun(id="step4", step=TestStep(test_context))], + ], + {"step1": StepStatus.SUCCESS, "step2": StepStatus.SUCCESS, "step3": StepStatus.SUCCESS, "step4": StepStatus.SUCCESS}, + RunStepOptions(fail_fast=True), + ), + ( + "Steps all succeed with parallel steps", + [ + [StepToRun(id="step1", step=TestStep(test_context))], + [ + StepToRun(id="step2", step=TestStep(test_context)), + StepToRun(id="step3", step=TestStep(test_context)), + ], + [StepToRun(id="step4", step=TestStep(test_context))], + ], + {"step1": StepStatus.SUCCESS, "step2": StepStatus.SUCCESS, "step3": StepStatus.SUCCESS, "step4": StepStatus.SUCCESS}, + RunStepOptions(fail_fast=True), + ), + ( + "Steps after a failed step are skipped, when fail_fast is True", + [ + [StepToRun(id="step1", step=TestStep(test_context))], + [StepToRun(id="step2", step=TestStep(test_context), args={"result_status": StepStatus.FAILURE})], + [StepToRun(id="step3", step=TestStep(test_context))], + [StepToRun(id="step4", step=TestStep(test_context))], + ], + {"step1": StepStatus.SUCCESS, "step2": StepStatus.FAILURE, "step3": StepStatus.SKIPPED, "step4": StepStatus.SKIPPED}, + RunStepOptions(fail_fast=True), + ), + ( + "Steps after a failed step are not skipped, when fail_fast is False", + [ + [StepToRun(id="step1", step=TestStep(test_context))], + [StepToRun(id="step2", step=TestStep(test_context), args={"result_status": StepStatus.FAILURE})], + [StepToRun(id="step3", step=TestStep(test_context))], + [StepToRun(id="step4", step=TestStep(test_context))], + ], + {"step1": StepStatus.SUCCESS, "step2": StepStatus.FAILURE, "step3": StepStatus.SUCCESS, "step4": StepStatus.SUCCESS}, + RunStepOptions(fail_fast=False), + ), + ( + "fail fast has no effect on parallel steps", + [ + [StepToRun(id="step1", step=TestStep(test_context))], + [ + StepToRun(id="step2", step=TestStep(test_context)), + StepToRun(id="step3", step=TestStep(test_context)), + ], + [StepToRun(id="step4", step=TestStep(test_context))], + ], + {"step1": StepStatus.SUCCESS, "step2": StepStatus.SUCCESS, "step3": StepStatus.SUCCESS, "step4": StepStatus.SUCCESS}, + RunStepOptions(fail_fast=False), + ), + ( + "Nested parallel steps execute properly", + [ + [StepToRun(id="step1", step=TestStep(test_context))], + [ + [StepToRun(id="step2", step=TestStep(test_context))], + [StepToRun(id="step3", step=TestStep(test_context))], + [ + StepToRun(id="step4", step=TestStep(test_context)), + StepToRun(id="step5", step=TestStep(test_context)), + ], + ], + [StepToRun(id="step6", step=TestStep(test_context))], + ], + { + "step1": StepStatus.SUCCESS, + "step2": StepStatus.SUCCESS, + "step3": StepStatus.SUCCESS, + "step4": StepStatus.SUCCESS, + "step5": StepStatus.SUCCESS, + "step6": StepStatus.SUCCESS, + }, + RunStepOptions(fail_fast=True), + ), + ( + "When fail_fast is True, nested parallel steps skip at the first failure", + [ + [StepToRun(id="step1", step=TestStep(test_context))], + [ + [StepToRun(id="step2", step=TestStep(test_context))], + [StepToRun(id="step3", step=TestStep(test_context))], + [ + StepToRun(id="step4", step=TestStep(test_context)), + StepToRun(id="step5", step=TestStep(test_context), args={"result_status": StepStatus.FAILURE}), + ], + ], + [StepToRun(id="step6", step=TestStep(test_context))], + ], + { + "step1": StepStatus.SUCCESS, + "step2": StepStatus.SUCCESS, + "step3": StepStatus.SUCCESS, + "step4": StepStatus.SUCCESS, + "step5": StepStatus.FAILURE, + "step6": StepStatus.SKIPPED, + }, + RunStepOptions(fail_fast=True), + ), + ( + "When fail_fast is False, nested parallel steps do not skip at the first failure", + [ + [StepToRun(id="step1", step=TestStep(test_context))], + [ + [StepToRun(id="step2", step=TestStep(test_context))], + [StepToRun(id="step3", step=TestStep(test_context))], + [ + StepToRun(id="step4", step=TestStep(test_context)), + StepToRun(id="step5", step=TestStep(test_context), args={"result_status": StepStatus.FAILURE}), + ], + ], + [StepToRun(id="step6", step=TestStep(test_context))], + ], + { + "step1": StepStatus.SUCCESS, + "step2": StepStatus.SUCCESS, + "step3": StepStatus.SUCCESS, + "step4": StepStatus.SUCCESS, + "step5": StepStatus.FAILURE, + "step6": StepStatus.SUCCESS, + }, + RunStepOptions(fail_fast=False), + ), + ( + "When fail_fast is False, consecutive steps still operate as expected", + [ + StepToRun(id="step1", step=TestStep(test_context)), + StepToRun(id="step2", step=TestStep(test_context)), + StepToRun(id="step3", step=TestStep(test_context)), + StepToRun(id="step4", step=TestStep(test_context)), + ], + {"step1": StepStatus.SUCCESS, "step2": StepStatus.SUCCESS, "step3": StepStatus.SUCCESS, "step4": StepStatus.SUCCESS}, + RunStepOptions(fail_fast=False), + ), + ( + "skip_steps skips the specified steps", + [ + StepToRun(id="step1", step=TestStep(test_context)), + StepToRun(id="step2", step=TestStep(test_context)), + StepToRun(id="step3", step=TestStep(test_context)), + StepToRun(id="step4", step=TestStep(test_context)), + ], + {"step1": StepStatus.SUCCESS, "step2": StepStatus.SKIPPED, "step3": StepStatus.SUCCESS, "step4": StepStatus.SUCCESS}, + RunStepOptions(fail_fast=False, skip_steps=["step2"]), + ), + ( + "step is skipped if the dependency fails", + [ + StepToRun(id="step1", step=TestStep(test_context)), + StepToRun(id="step2", step=TestStep(test_context), args={"result_status": StepStatus.FAILURE}), + StepToRun(id="step3", step=TestStep(test_context)), + StepToRun(id="step4", step=TestStep(test_context), depends_on=["step2"]), + ], + {"step1": StepStatus.SUCCESS, "step2": StepStatus.FAILURE, "step3": StepStatus.SUCCESS, "step4": StepStatus.SKIPPED}, + RunStepOptions(fail_fast=False), + ), + ], +) +async def test_run_steps_output(desc, steps, expected_results, options): + results = await run_steps(steps, options=options) + + for step_id, expected_status in expected_results.items(): + assert results[step_id].status == expected_status, desc + + +@pytest.mark.anyio +async def test_run_steps_concurrent(): + ran_at = {} + + class SleepStep(Step): + title = "Sleep Step" + + async def _run(self, name, sleep) -> StepResult: + await anyio.sleep(sleep) + ran_at[name] = time.time() + return StepResult(self, StepStatus.SUCCESS) + + steps = [ + StepToRun(id="step1", step=SleepStep(test_context), args={"name": "step1", "sleep": 2}), + StepToRun(id="step2", step=SleepStep(test_context), args={"name": "step2", "sleep": 2}), + StepToRun(id="step3", step=SleepStep(test_context), args={"name": "step3", "sleep": 2}), + StepToRun(id="step4", step=SleepStep(test_context), args={"name": "step4", "sleep": 0}), + ] + + await run_steps(steps) + + # assert that step4 is the first step to finish + assert ran_at["step4"] < ran_at["step1"] + assert ran_at["step4"] < ran_at["step2"] + assert ran_at["step4"] < ran_at["step3"] + + +@pytest.mark.anyio +async def test_run_steps_concurrency_of_1(): + ran_at = {} + + class SleepStep(Step): + title = "Sleep Step" + + async def _run(self, name, sleep) -> StepResult: + ran_at[name] = time.time() + await anyio.sleep(sleep) + return StepResult(self, StepStatus.SUCCESS) + + steps = [ + StepToRun(id="step1", step=SleepStep(test_context), args={"name": "step1", "sleep": 1}), + StepToRun(id="step2", step=SleepStep(test_context), args={"name": "step2", "sleep": 1}), + StepToRun(id="step3", step=SleepStep(test_context), args={"name": "step3", "sleep": 1}), + StepToRun(id="step4", step=SleepStep(test_context), args={"name": "step4", "sleep": 1}), + ] + + await run_steps(steps, options=RunStepOptions(concurrency=1)) + + # Assert that they run sequentially + assert ran_at["step1"] < ran_at["step2"] + assert ran_at["step2"] < ran_at["step3"] + assert ran_at["step3"] < ran_at["step4"] + + +@pytest.mark.anyio +async def test_run_steps_sequential(): + ran_at = {} + + class SleepStep(Step): + title = "Sleep Step" + + async def _run(self, name, sleep) -> StepResult: + await anyio.sleep(sleep) + ran_at[name] = time.time() + return StepResult(self, StepStatus.SUCCESS) + + steps = [ + [StepToRun(id="step1", step=SleepStep(test_context), args={"name": "step1", "sleep": 1})], + [StepToRun(id="step1", step=SleepStep(test_context), args={"name": "step2", "sleep": 1})], + [StepToRun(id="step3", step=SleepStep(test_context), args={"name": "step3", "sleep": 1})], + [StepToRun(id="step4", step=SleepStep(test_context), args={"name": "step4", "sleep": 0})], + ] + + await run_steps(steps) + + # assert that steps are run in order + assert ran_at["step1"] < ran_at["step2"] + assert ran_at["step2"] < ran_at["step3"] + assert ran_at["step3"] < ran_at["step4"] + + +@pytest.mark.anyio +async def test_run_steps_passes_results(): + """ + Example pattern + StepToRun( + id=CONNECTOR_TEST_STEP_ID.INTEGRATION, + step=IntegrationTests(context), + args=_create_integration_step_args_factory(context), + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], + ), + StepToRun( + id=CONNECTOR_TEST_STEP_ID.ACCEPTANCE, + step=AcceptanceTests(context, True), + args=lambda results: {"connector_under_test_container": results[CONNECTOR_TEST_STEP_ID.BUILD].output_artifact[LOCAL_BUILD_PLATFORM]}, + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], + ), + + """ + + class Simple(Step): + title = "Test Step" + + async def _run(self, arg1, arg2) -> StepResult: + output_artifact = f"{arg1}:{arg2}" + return StepResult(self, StepStatus.SUCCESS, output_artifact=output_artifact) + + async def async_args(results): + return {"arg1": results["step2"].output_artifact, "arg2": "4"} + + steps = [ + [StepToRun(id="step1", step=Simple(test_context), args={"arg1": "1", "arg2": "2"})], + [StepToRun(id="step2", step=Simple(test_context), args=lambda results: {"arg1": results["step1"].output_artifact, "arg2": "3"})], + [StepToRun(id="step3", step=Simple(test_context), args=async_args)], + ] + + results = await run_steps(steps) + + assert results["step1"].output_artifact == "1:2" + assert results["step2"].output_artifact == "1:2:3" + assert results["step3"].output_artifact == "1:2:3:4" + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "invalid_args", + [ + 1, + True, + "string", + [1, 2], + None, + ], +) +async def test_run_steps_throws_on_invalid_args(invalid_args): + steps = [ + [StepToRun(id="step1", step=TestStep(test_context), args=invalid_args)], + ] + + with pytest.raises(TypeError): + await run_steps(steps) diff --git a/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py b/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py index 7f79cdea8680..567136eeb264 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py +++ b/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py @@ -11,6 +11,7 @@ import pytest import yaml from freezegun import freeze_time +from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.connectors.test.steps import common from pipelines.dagger.actions.system import docker from pipelines.helpers.connectors.modifed import ConnectorWithModifiedFiles @@ -39,8 +40,19 @@ def get_dummy_cat_container(dagger_client: dagger.Client, exit_code: int, secret return container.with_new_file("/stupid_bash_script.sh", contents=f"echo {stdout}; echo {stderr} >&2; exit {exit_code}") @pytest.fixture - def test_context(self, mocker, dagger_client): - return mocker.MagicMock(connector=ConnectorWithModifiedFiles("source-faker", frozenset()), dagger_client=dagger_client) + def test_context_ci(self, current_platform, dagger_client): + context = ConnectorContext( + pipeline_name="test", + connector=ConnectorWithModifiedFiles("source-faker", frozenset()), + git_branch="test", + git_revision="test", + report_output_prefix="test", + is_local=False, + use_remote_secrets=True, + targeted_platforms=[current_platform], + ) + context.dagger_client = dagger_client + return context @pytest.fixture def dummy_connector_under_test_container(self, dagger_client) -> dagger.Container: @@ -50,9 +62,9 @@ def dummy_connector_under_test_container(self, dagger_client) -> dagger.Containe def another_dummy_connector_under_test_container(self, dagger_client) -> dagger.File: return dagger_client.container().from_("airbyte/source-pokeapi:latest") - async def test_skipped_when_no_acceptance_test_config(self, mocker, test_context): - test_context.connector = mocker.MagicMock(acceptance_test_config=None) - acceptance_test_step = common.AcceptanceTests(test_context) + async def test_skipped_when_no_acceptance_test_config(self, mocker, test_context_ci): + test_context_ci.connector = mocker.MagicMock(acceptance_test_config=None) + acceptance_test_step = common.AcceptanceTests(test_context_ci) step_result = await acceptance_test_step._run(None) assert step_result.status == StepStatus.SKIPPED @@ -112,7 +124,7 @@ async def test_skipped_when_no_acceptance_test_config(self, mocker, test_context ) async def test__run( self, - test_context, + test_context_ci, mocker, exit_code: int, expected_status: StepStatus, @@ -122,23 +134,23 @@ async def test__run( ): """Test the behavior of the run function using a dummy container.""" cat_container = self.get_dummy_cat_container( - test_context.dagger_client, exit_code, secrets_file_names, stdout="hello", stderr="world" + test_context_ci.dagger_client, exit_code, secrets_file_names, stdout="hello", stderr="world" ) async_mock = mocker.AsyncMock(return_value=cat_container) mocker.patch.object(common.AcceptanceTests, "_build_connector_acceptance_test", side_effect=async_mock) mocker.patch.object(common.AcceptanceTests, "get_cat_command", return_value=["bash", "/stupid_bash_script.sh"]) - test_context.get_connector_dir = mocker.AsyncMock(return_value=test_input_dir) - acceptance_test_step = common.AcceptanceTests(test_context) + test_context_ci.get_connector_dir = mocker.AsyncMock(return_value=test_input_dir) + acceptance_test_step = common.AcceptanceTests(test_context_ci) step_result = await acceptance_test_step._run(None) assert step_result.status == expected_status assert step_result.stdout.strip() == "hello" assert step_result.stderr.strip() == "world" if expect_updated_secrets: assert ( - await test_context.updated_secrets_dir.entries() + await test_context_ci.updated_secrets_dir.entries() == await cat_container.directory(f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}").entries() ) - assert any("updated_configurations" in str(file_name) for file_name in await test_context.updated_secrets_dir.entries()) + assert any("updated_configurations" in str(file_name) for file_name in await test_context_ci.updated_secrets_dir.entries()) @pytest.fixture def test_input_dir(self, dagger_client, tmpdir): @@ -146,17 +158,18 @@ def test_input_dir(self, dagger_client, tmpdir): yaml.safe_dump({"connector_image": "airbyte/connector_under_test_image:dev"}, f) return dagger_client.host().directory(str(tmpdir)) - def get_patched_acceptance_test_step(self, dagger_client, mocker, test_context, test_input_dir): - test_context.get_connector_dir = mocker.AsyncMock(return_value=test_input_dir) - test_context.connector_acceptance_test_image = "bash:latest" - test_context.connector_secrets = {"config.json": dagger_client.set_secret("config.json", "connector_secret")} + def get_patched_acceptance_test_step(self, dagger_client, mocker, test_context_ci, test_input_dir): + test_secrets = {"config.json": dagger_client.set_secret("config.json", "connector_secret")} + test_context_ci.get_connector_dir = mocker.AsyncMock(return_value=test_input_dir) + test_context_ci.connector_acceptance_test_image = "bash:latest" + test_context_ci.get_connector_secrets = mocker.AsyncMock(return_value=test_secrets) mocker.patch.object(docker, "load_image_to_docker_host", return_value="image_sha") mocker.patch.object(docker, "with_bound_docker_host", lambda _, cat_container: cat_container) - return common.AcceptanceTests(test_context) + return common.AcceptanceTests(test_context_ci) async def test_cat_container_provisioning( - self, dagger_client, mocker, test_context, test_input_dir, dummy_connector_under_test_container + self, dagger_client, mocker, test_context_ci, test_input_dir, dummy_connector_under_test_container ): """Check that the acceptance test container is correctly provisioned. We check that: @@ -168,9 +181,8 @@ async def test_cat_container_provisioning( # The mounted_connector_secrets behaves differently when the test is run locally or in CI. # It is not masking the secrets when run locally. # We want to confirm that the secrets are correctly masked when run in CI. - test_context.is_local = False - test_context.is_ci = True - acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context, test_input_dir) + + acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context_ci, test_input_dir) cat_container = await acceptance_test_step._build_connector_acceptance_test(dummy_connector_under_test_container, test_input_dir) assert (await cat_container.with_exec(["pwd"]).stdout()).strip() == acceptance_test_step.CONTAINER_TEST_INPUT_DIRECTORY test_input_ls_result = await cat_container.with_exec(["ls"]).stdout() @@ -185,7 +197,7 @@ async def test_cat_container_caching( self, dagger_client, mocker, - test_context, + test_context_ci, test_input_dir, dummy_connector_under_test_container, another_dummy_connector_under_test_container, @@ -195,7 +207,7 @@ async def test_cat_container_caching( initial_datetime = datetime.datetime(year=1992, month=6, day=19, hour=13, minute=1, second=0) with freeze_time(initial_datetime) as frozen_datetime: - acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context, test_input_dir) + acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context_ci, test_input_dir) cat_container = await acceptance_test_step._build_connector_acceptance_test( dummy_connector_under_test_container, test_input_dir ) diff --git a/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py b/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py index da63c33fb01c..4063c782c05f 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py +++ b/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py @@ -28,7 +28,7 @@ def certified_connector_with_setup(self, all_connectors): pytest.skip("No certified connector with setup.py found.") @pytest.fixture - def context_for_certified_connector_with_setup(self, certified_connector_with_setup, dagger_client, current_platform): + def context_for_certified_connector_with_setup(self, mocker, certified_connector_with_setup, dagger_client, current_platform): context = ConnectorContext( pipeline_name="test unit tests", connector=certified_connector_with_setup, @@ -40,7 +40,7 @@ def context_for_certified_connector_with_setup(self, certified_connector_with_se targeted_platforms=[current_platform], ) context.dagger_client = dagger_client - context.connector_secrets = {} + context.get_connector_secrets = mocker.AsyncMock(return_value={}) return context @pytest.fixture @@ -49,7 +49,7 @@ async def certified_container_with_setup(self, context_for_certified_connector_w return result.output_artifact[current_platform] @pytest.fixture - def context_for_connector_with_poetry(self, connector_with_poetry, dagger_client, current_platform): + def context_for_connector_with_poetry(self, mocker, connector_with_poetry, dagger_client, current_platform): context = ConnectorContext( pipeline_name="test unit tests", connector=connector_with_poetry, @@ -61,7 +61,7 @@ def context_for_connector_with_poetry(self, connector_with_poetry, dagger_client targeted_platforms=[current_platform], ) context.dagger_client = dagger_client - context.connector_secrets = {} + context.get_connector_secrets = mocker.AsyncMock(return_value={}) return context @pytest.fixture