Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Aug 14, 2023
2 parents 31da8d2 + 12f1304 commit 6d4a612
Show file tree
Hide file tree
Showing 756 changed files with 4,933 additions and 3,332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Config:
title="Inference Type",
default=InferenceType.NONE,
description="How to infer the types of the columns. If none, inference default to strings.",
airbyte_hidden=True,
)

@validator("delimiter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class DefaultFileBasedCursor(AbstractFileBasedCursor):
DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
DEFAULT_MAX_HISTORY_SIZE = 10_000
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
CURSOR_FIELD = "_ab_source_file_last_modified"

def __init__(self, stream_config: FileBasedStreamConfig, **_: Any):
super().__init__(stream_config)
Expand Down Expand Up @@ -48,7 +49,7 @@ def add_file(self, file: RemoteFile) -> None:
)

def get_state(self) -> StreamState:
state = {"history": self._file_to_datetime_history, "_ab_source_file_last_modified": self._get_cursor()}
state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()}
return state

def _get_cursor(self) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,6 @@
"type": "object",
"properties": {
"filetype": {"title": "Filetype", "default": "csv", "enum": ["csv"], "type": "string"},
"inference_type": {
"default": "None",
"description": "How to infer the types of the columns. If none, inference default to strings.",
"title": "Inference Type",
"enum": [
"None",
"Primitive Types Only",
],
},
"delimiter": {
"title": "Delimiter",
"description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
Expand Down Expand Up @@ -211,6 +202,13 @@
"items": {"type": "string"},
"uniqueItems": True,
},
"inference_type": {
"title": "Inference Type",
"description": "How to infer the types of the columns. If none, inference default to strings.",
"default": "None",
"airbyte_hidden": True,
"enum": ["None", "Primitive Types Only"],
},
},
},
{
Expand Down
1 change: 1 addition & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ This command runs the Python tests for a airbyte-ci poetry package.
## Changelog
| Version | PR | Description |
| ------- | --------------------------------------------------------- | --------------------------------------------------------------------------------------------------------- |
| 0.5.0 | [#28000](https://github.com/airbytehq/airbyte/pull/28000) | Run connector acceptance tests with dagger-in-dagger. |
| 0.4.7 | [#29156](https://github.com/airbytehq/airbyte/pull/29156) | Improve how we check existence of requirement.txt or setup.py file to not raise early pip install errors. |
| 0.4.6 | [#28729](https://github.com/airbytehq/airbyte/pull/28729) | Use keyword args instead of positional argument for optional paramater in Dagger's API |
| 0.4.5 | [#29034](https://github.com/airbytehq/airbyte/pull/29034) | Disable Dagger terminal UI when running publish. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,7 @@ def with_crane(

def mounted_connector_secrets(context: PipelineContext, secret_directory_path="secrets") -> Callable:
def mounted_connector_secrets_inner(container: Container):
container = container.with_exec(["mkdir", secret_directory_path], skip_entrypoint=True)
for secret_file_name, secret in context.connector_secrets.items():
container = container.with_mounted_secret(f"{secret_directory_path}/{secret_file_name}", secret)
return container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dagger import QueryError
from pipelines.actions.environments import with_airbyte_python_connector
from pipelines.bases import StepResult, StepStatus
from pipelines.builds.common import BuildConnectorImageBase, BuildConnectorImageForAllPlatformsBase
from pipelines.contexts import ConnectorContext
from dagger import QueryError


class BuildConnectorImage(BuildConnectorImageBase):
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pipelines/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async def __aexit__(
class ConnectorContext(PipelineContext):
"""The connector context is used to store configuration for a specific connector pipeline run."""

DEFAULT_CONNECTOR_ACCEPTANCE_TEST_IMAGE = "airbyte/connector-acceptance-test:latest"
DEFAULT_CONNECTOR_ACCEPTANCE_TEST_IMAGE = "airbyte/connector-acceptance-test:dev"

def __init__(
self,
Expand Down
34 changes: 1 addition & 33 deletions airbyte-ci/connectors/pipelines/pipelines/hacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

from __future__ import annotations

from copy import deepcopy
from typing import TYPE_CHECKING, Callable, List

import requests
import yaml
from connector_ops.utils import ConnectorLanguage
from dagger import DaggerError

Expand Down Expand Up @@ -60,35 +58,6 @@ async def _patch_gradle_file(context: ConnectorContext, connector_dir: Directory
return connector_dir.with_new_file("build.gradle", contents="\n".join(patched_gradle_file))


def _patch_cat_config(context: ConnectorContext, connector_dir: Directory) -> Directory:
"""
Patch the acceptance-test-config.yml file of the connector under test to use the connector image with git revision tag and not dev.
Underlying issue:
acceptance-test-config.yml targets the connector image with the dev tag by default
in order to make sure the correct connector image is used when running the acceptance tests we tag the connector under test image with the git revision.
we patch the acceptance-test-config.yml file to use the connector image with the git revision tag.
Hack:
This function is called by patch_connector_dir, which is called every time the connector source directory is read by the pipeline.
Args:
context (ConnectorContext): The initialized connector context.
connector_dir (Directory): The directory containing the acceptance-test-config.yml file to patch.
"""
if not context.connector.acceptance_test_config:
return connector_dir

context.logger.info("Patching acceptance-test-config.yml to use connector image with git revision tag and not dev.")

patched_cat_config = deepcopy(context.connector.acceptance_test_config)
patched_cat_config["connector_image"] = context.connector.acceptance_test_config["connector_image"].replace(
":dev", f":{context.git_revision}"
)
return connector_dir.with_new_file("acceptance-test-config.yml", contents=yaml.safe_dump(patched_cat_config))


async def patch_connector_dir(context: ConnectorContext, connector_dir: Directory) -> Directory:
"""Patch a connector directory: patch cat config, gradle file and dockerfile.
Expand All @@ -99,7 +68,6 @@ async def patch_connector_dir(context: ConnectorContext, connector_dir: Director
Directory: The directory containing the patched connector.
"""
patched_connector_dir = await _patch_gradle_file(context, connector_dir)
patched_connector_dir = _patch_cat_config(context, patched_connector_dir)
return patched_connector_dir.with_timestamps(1)


Expand Down Expand Up @@ -162,6 +130,6 @@ def never_fail_exec(command: List[str]) -> Callable:
"""

def never_fail_exec_inner(container: Container):
return container.with_exec(["sh", "-c", f"{' '.join(command)}; echo $? > /exit_code"])
return container.with_exec(["sh", "-c", f"{' '.join(command)}; echo $? > /exit_code"], skip_entrypoint=True)

return never_fail_exec_inner
79 changes: 51 additions & 28 deletions airbyte-ci/connectors/pipelines/pipelines/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""This module groups steps made to run tests agnostic to a connector language."""

import datetime
import os
from abc import ABC, abstractmethod
from functools import cached_property
from typing import ClassVar, List, Optional
Expand All @@ -13,7 +14,7 @@
import semver
import yaml
from connector_ops.utils import Connector
from dagger import Container, File
from dagger import Container, Directory, File
from pipelines import hacks
from pipelines.actions import environments
from pipelines.bases import CIContext, PytestStep, Step, StepResult, StepStatus
Expand Down Expand Up @@ -181,7 +182,7 @@ class AcceptanceTests(PytestStep):
CONTAINER_SECRETS_DIRECTORY = "/test_input/secrets"

@property
def cat_command(self) -> List[str]:
def base_cat_command(self) -> List[str]:
return [
"python",
"-m",
Expand All @@ -192,6 +193,18 @@ def cat_command(self) -> List[str]:
self.CONTAINER_TEST_INPUT_DIRECTORY,
]

async def get_cat_command(self, connector_dir: Directory) -> List[str]:
"""
Connectors can optionally setup or teardown resources before and after the acceptance tests are run.
This is done via the acceptance.py file in their integration_tests directory.
We append this module as a plugin the acceptance will use.
"""
cat_command = self.base_cat_command
if "integration_tests" in await connector_dir.entries():
if "acceptance.py" in await connector_dir.directory("integration_tests").entries():
cat_command += ["-p", "integration_tests.acceptance"]
return cat_command

async def _run(self, connector_under_test_image_tar: File) -> StepResult:
"""Run the acceptance test suite on a connector dev image. Build the connector acceptance test image if the tag is :dev.
Expand All @@ -203,10 +216,10 @@ async def _run(self, connector_under_test_image_tar: File) -> StepResult:
"""
if not self.context.connector.acceptance_test_config:
return StepResult(self, StepStatus.SKIPPED)

cat_container = await self._build_connector_acceptance_test(connector_under_test_image_tar)
cat_container = cat_container.with_(hacks.never_fail_exec(self.cat_command))

connector_dir = await self.context.get_connector_dir()
cat_container = await self._build_connector_acceptance_test(connector_under_test_image_tar, connector_dir)
cat_command = await self.get_cat_command(connector_dir)
cat_container = cat_container.with_(hacks.never_fail_exec(cat_command))
step_result = await self.get_step_result(cat_container)
secret_dir = cat_container.directory(self.CONTAINER_SECRETS_DIRECTORY)

Expand All @@ -217,44 +230,54 @@ async def _run(self, connector_under_test_image_tar: File) -> StepResult:
break
return step_result

def get_cache_buster(self) -> str:
async def get_cache_buster(self, connector_under_test_image_tar: File) -> str:
"""
This bursts the CAT cached results everyday.
This bursts the CAT cached results everyday and on new version or image size change.
It's cool because in case of a partially failing nightly build the connectors that already ran CAT won't re-run CAT.
We keep the guarantee that a CAT runs everyday.
Args:
connector_under_test_image_tar (File): The file holding the tar archive of the connector image.
Returns:
str: A string representing the current date.
str: A string representing the cachebuster value.
"""
return datetime.datetime.utcnow().strftime("%Y%m%d")
return (
datetime.datetime.utcnow().strftime("%Y%m%d")
+ self.context.connector.version
+ str(await connector_under_test_image_tar.size())
)

async def _build_connector_acceptance_test(self, connector_under_test_image_tar: File) -> Container:
"""Create a container to run connector acceptance tests, bound to a persistent docker host.
async def _build_connector_acceptance_test(self, connector_under_test_image_tar: File, test_input: Directory) -> Container:
"""Create a container to run connector acceptance tests.
Args:
connector_under_test_image_tar (File): The file containing the tar archive the image of the connector under test.
connector_under_test_image_tar (File): The file containing the tar archive of the image of the connector under test.
test_input (Directory): The connector under test directory.
Returns:
Container: A container with connector acceptance tests installed.
"""
test_input = await self.context.get_connector_dir()
cat_config = yaml.safe_load(await test_input.file("acceptance-test-config.yml").contents())

image_sha = await environments.load_image_to_docker_host(
self.context, connector_under_test_image_tar, cat_config["connector_image"]
)

if self.context.connector_acceptance_test_image.endswith(":dev"):
cat_container = self.context.connector_acceptance_test_source_dir.docker_build()
else:
cat_container = self.dagger_client.container().from_(self.context.connector_acceptance_test_image)

return (
environments.with_bound_docker_host(self.context, cat_container)
.with_entrypoint([])
.with_mounted_directory(self.CONTAINER_TEST_INPUT_DIRECTORY, test_input)
.with_env_variable("CONNECTOR_IMAGE_ID", image_sha)
.with_env_variable("CACHEBUSTER", self.get_cache_buster())
.with_workdir(self.CONTAINER_TEST_INPUT_DIRECTORY)
.with_exec(["mkdir", "-p", self.CONTAINER_SECRETS_DIRECTORY])
.with_(environments.mounted_connector_secrets(self.context, self.CONTAINER_SECRETS_DIRECTORY))
cat_container = (
cat_container.with_env_variable("RUN_IN_AIRBYTE_CI", "1")
.with_exec(["mkdir", "/dagger_share"], skip_entrypoint=True)
.with_env_variable("CACHEBUSTER", await self.get_cache_buster(connector_under_test_image_tar))
.with_mounted_file("/dagger_share/connector_under_test_image.tar", connector_under_test_image_tar)
.with_env_variable("CONNECTOR_UNDER_TEST_IMAGE_TAR_PATH", "/dagger_share/connector_under_test_image.tar")
.with_workdir("/test_input")
.with_mounted_directory("/test_input", test_input)
.with_(environments.mounted_connector_secrets(self.context, secret_directory_path="/test_input/secrets"))
)
if "_EXPERIMENTAL_DAGGER_RUNNER_HOST" in os.environ:
self.context.logger.info("Using experimental dagger runner host to run CAT with dagger-in-dagger")
cat_container = cat_container.with_env_variable(
"_EXPERIMENTAL_DAGGER_RUNNER_HOST", "unix:///var/run/buildkit/buildkitd.sock"
).with_unix_socket(
"/var/run/buildkit/buildkitd.sock", self.context.dagger_client.host().unix_socket("/var/run/buildkit/buildkitd.sock")
)

return cat_container.with_unix_socket("/var/run/docker.sock", self.context.dagger_client.host().unix_socket("/var/run/docker.sock"))
Loading

0 comments on commit 6d4a612

Please sign in to comment.