From d8a7437bf3c6b8652924c4baab1c5c6ef9685f59 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Mon, 16 Aug 2021 12:49:07 +0300 Subject: [PATCH 01/10] Add MultipleTokenAuthenticator --- airbyte-cdk/python/CHANGELOG.md | 3 +++ .../sources/streams/http/auth/__init__.py | 3 ++- .../sources/streams/http/auth/token.py | 15 +++++++++++++++ airbyte-cdk/python/setup.py | 2 +- .../sources/streams/http/auth/test_auth.py | 12 +++++++++++- 5 files changed, 32 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 425948d15da1..3692b2ac91a5 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.9 +Add multiple token support + ## 0.1.8 Allow to fetch primary key info from singer catalog diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/__init__.py index 7c19ffa742c2..8c81c588e0ef 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/__init__.py @@ -1,11 +1,12 @@ # Initialize Auth Package from .core import HttpAuthenticator, NoAuth from .oauth import Oauth2Authenticator -from .token import TokenAuthenticator +from .token import MultipleTokenAuthenticator, TokenAuthenticator __all__ = [ "HttpAuthenticator", "NoAuth", "Oauth2Authenticator", "TokenAuthenticator", + "MultipleTokenAuthenticator", ] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py index beceda46b3d3..1cfac34de787 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py @@ -23,11 +23,15 @@ # +from itertools import cycle from typing import Any, Mapping from .core import HttpAuthenticator +TOKEN_SEPARATOR = "," + + class TokenAuthenticator(HttpAuthenticator): def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"): self.auth_method = auth_method @@ -36,3 +40,14 @@ def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = " def get_auth_header(self) -> Mapping[str, Any]: return {self.auth_header: f"{self.auth_method} {self._token}"} + + +class MultipleTokenAuthenticator(HttpAuthenticator): + def __init__(self, tokens: str, auth_method: str = "Bearer", auth_header: str = "Authorization"): + self.auth_method = auth_method + self.auth_header = auth_header + self._tokens = [line.strip() for line in tokens.split(TOKEN_SEPARATOR)] + self._tokens_iter = cycle(self._tokens) + + def get_auth_header(self) -> Mapping[str, Any]: + return {self.auth_header: f"{self.auth_method} {next(self._tokens_iter)}"} diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index b554ccf9fbb3..0ef97dfa9af5 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.8", + version="0.1.9", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py b/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py index c6f4e7589886..107dfb09b44e 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py @@ -26,7 +26,7 @@ import logging import requests -from airbyte_cdk.sources.streams.http.auth import NoAuth, Oauth2Authenticator, TokenAuthenticator +from airbyte_cdk.sources.streams.http.auth import NoAuth, Oauth2Authenticator, TokenAuthenticator, MultipleTokenAuthenticator from requests import Response LOGGER = logging.getLogger(__name__) @@ -43,6 +43,16 @@ def test_token_authenticator(): assert {"Authorization": "Bearer test-token"} == header +def test_multiple_token_authenticator(): + token = MultipleTokenAuthenticator("token1, token2") + header1 = token.get_auth_header() + assert {"Authorization": "Bearer token1"} == header1 + header2 = token.get_auth_header() + assert {"Authorization": "Bearer token2"} == header2 + header3 = token.get_auth_header() + assert {"Authorization": "Bearer token1"} == header3 + + def test_no_auth(): """ Should always return empty body, no matter how many times token is retrieved. From db9609cfa72cc9530bc664ce01fb58fbaf730e91 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Mon, 16 Aug 2021 17:09:00 +0300 Subject: [PATCH 02/10] Upd docs --- airbyte-cdk/python/README.md | 2 +- docs/connector-development/cdk-python/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/README.md b/airbyte-cdk/python/README.md index 10fe5d93e1ca..bf0e267fdb8f 100644 --- a/airbyte-cdk/python/README.md +++ b/airbyte-cdk/python/README.md @@ -74,7 +74,7 @@ All tests are located in the `unit_tests` directory. Run `pytest --cov=airbyte_c 1. Bump the package version in `setup.py` 2. Open a PR -3. An Airbyte member must comment `/publish-cdk --dry-run=`. Dry runs publish to test.pypi.org. +3. An Airbyte member must comment `/publish-cdk dry-run=`. Dry runs publish to test.pypi.org. ## Coming Soon diff --git a/docs/connector-development/cdk-python/README.md b/docs/connector-development/cdk-python/README.md index 526fd67ab3e8..45b61591c2ea 100644 --- a/docs/connector-development/cdk-python/README.md +++ b/docs/connector-development/cdk-python/README.md @@ -94,7 +94,7 @@ All tests are located in the `unit_tests` directory. Run `pytest --cov=airbyte_c 1. Bump the package version in `setup.py` 2. Open a PR -3. An Airbyte member must comment `/publish-cdk --dry-run=`. Dry runs publish to test.pypi.org. +3. An Airbyte member must comment `/publish-cdk dry-run=`. Dry runs publish to test.pypi.org. ## Coming Soon From 278e0afe798fcf9f2ed58e2a44601cc134b2a6e7 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Mon, 16 Aug 2021 17:15:22 +0300 Subject: [PATCH 03/10] Refactor --- .../python/airbyte_cdk/sources/streams/http/auth/token.py | 1 - .../python/unit_tests/sources/streams/http/auth/test_auth.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py index 1cfac34de787..2c6394dbaed6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py @@ -28,7 +28,6 @@ from .core import HttpAuthenticator - TOKEN_SEPARATOR = "," diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py b/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py index 107dfb09b44e..21f7ca2ff1cc 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py @@ -26,7 +26,7 @@ import logging import requests -from airbyte_cdk.sources.streams.http.auth import NoAuth, Oauth2Authenticator, TokenAuthenticator, MultipleTokenAuthenticator +from airbyte_cdk.sources.streams.http.auth import MultipleTokenAuthenticator, NoAuth, Oauth2Authenticator, TokenAuthenticator from requests import Response LOGGER = logging.getLogger(__name__) From a57965d6cab60032b2e459cdf8b24c29b6fde39d Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Mon, 16 Aug 2021 17:21:28 +0300 Subject: [PATCH 04/10] Upd docs --- airbyte-cdk/python/README.md | 2 +- docs/connector-development/cdk-python/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/README.md b/airbyte-cdk/python/README.md index bf0e267fdb8f..e848bf96264e 100644 --- a/airbyte-cdk/python/README.md +++ b/airbyte-cdk/python/README.md @@ -74,7 +74,7 @@ All tests are located in the `unit_tests` directory. Run `pytest --cov=airbyte_c 1. Bump the package version in `setup.py` 2. Open a PR -3. An Airbyte member must comment `/publish-cdk dry-run=`. Dry runs publish to test.pypi.org. +3. An Airbyte member must comment `/publish-cdk dry-run=true` to publish the package to test.pypi.org or `/publish-cdk dry-run=false` to publish it to the real index of pypi.org. ## Coming Soon diff --git a/docs/connector-development/cdk-python/README.md b/docs/connector-development/cdk-python/README.md index 45b61591c2ea..206518dff45c 100644 --- a/docs/connector-development/cdk-python/README.md +++ b/docs/connector-development/cdk-python/README.md @@ -94,7 +94,7 @@ All tests are located in the `unit_tests` directory. Run `pytest --cov=airbyte_c 1. Bump the package version in `setup.py` 2. Open a PR -3. An Airbyte member must comment `/publish-cdk dry-run=`. Dry runs publish to test.pypi.org. +3. An Airbyte member must comment `/publish-cdk dry-run=true` to publish the package to test.pypi.org or `/publish-cdk dry-run=false` to publish it to the real index of pypi.org. ## Coming Soon From 035f665a4379adc13b774e53d088cdd927842630 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Mon, 16 Aug 2021 19:16:01 +0300 Subject: [PATCH 05/10] Upd multiple token support: switch to list of tokens --- airbyte-cdk/python/CHANGELOG.md | 3 +++ .../python/airbyte_cdk/sources/streams/http/auth/token.py | 8 +++----- airbyte-cdk/python/setup.py | 2 +- .../unit_tests/sources/streams/http/auth/test_auth.py | 2 +- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 3692b2ac91a5..2809c4f3c987 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.10 +Upd multiple token support: switch to list of tokens + ## 0.1.9 Add multiple token support diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py index 2c6394dbaed6..294e19175d3e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/auth/token.py @@ -24,12 +24,10 @@ from itertools import cycle -from typing import Any, Mapping +from typing import Any, List, Mapping from .core import HttpAuthenticator -TOKEN_SEPARATOR = "," - class TokenAuthenticator(HttpAuthenticator): def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "Authorization"): @@ -42,10 +40,10 @@ def get_auth_header(self) -> Mapping[str, Any]: class MultipleTokenAuthenticator(HttpAuthenticator): - def __init__(self, tokens: str, auth_method: str = "Bearer", auth_header: str = "Authorization"): + def __init__(self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"): self.auth_method = auth_method self.auth_header = auth_header - self._tokens = [line.strip() for line in tokens.split(TOKEN_SEPARATOR)] + self._tokens = tokens self._tokens_iter = cycle(self._tokens) def get_auth_header(self) -> Mapping[str, Any]: diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 0ef97dfa9af5..693fe163c3b4 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.9", + version="0.1.10", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py b/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py index 21f7ca2ff1cc..3e561af92acb 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/auth/test_auth.py @@ -44,7 +44,7 @@ def test_token_authenticator(): def test_multiple_token_authenticator(): - token = MultipleTokenAuthenticator("token1, token2") + token = MultipleTokenAuthenticator(["token1", "token2"]) header1 = token.get_auth_header() assert {"Authorization": "Bearer token1"} == header1 header2 = token.get_auth_header() From c019bce3ff64bee80b446ab804d5287f37f06682 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Tue, 17 Aug 2021 11:23:51 +0300 Subject: [PATCH 06/10] 4776: Python CDK: Validate input config.py against spec --- airbyte-cdk/python/CHANGELOG.md | 6 ++ .../airbyte_cdk/destinations/destination.py | 40 ++++++++----- airbyte-cdk/python/airbyte_cdk/entrypoint.py | 8 ++- .../sources/utils/schema_helpers.py | 22 ++++++- airbyte-cdk/python/setup.py | 2 +- .../destinations/test_destination.py | 2 + .../sources/streams/http/test_http.py | 3 +- .../python/unit_tests/test_entrypoint.py | 60 ++++++++++++++----- 8 files changed, 107 insertions(+), 36 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 425948d15da1..63e18883e50d 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.1.10 +Add checking specified config againt spec for read, write, check and ddiscover commands + +##0.1.9 +remove this line after rebase + ## 0.1.8 Allow to fetch primary key info from singer catalog diff --git a/airbyte-cdk/python/airbyte_cdk/destinations/destination.py b/airbyte-cdk/python/airbyte_cdk/destinations/destination.py index 34269f85f8a4..2e0e597a8bf8 100644 --- a/airbyte-cdk/python/airbyte_cdk/destinations/destination.py +++ b/airbyte-cdk/python/airbyte_cdk/destinations/destination.py @@ -30,12 +30,14 @@ from airbyte_cdk import AirbyteLogger from airbyte_cdk.connector import Connector -from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type +from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Type +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec from pydantic import ValidationError class Destination(Connector, ABC): logger = AirbyteLogger() + VALID_CMDS = {"spec", "check", "write"} @abstractmethod def write( @@ -43,24 +45,24 @@ def write( ) -> Iterable[AirbyteMessage]: """Implement to define how the connector writes data to the destination""" - def _run_spec(self) -> AirbyteMessage: - return AirbyteMessage(type=Type.SPEC, spec=self.spec(self.logger)) + def _run_spec(self, spec: ConnectorSpecification) -> AirbyteMessage: + return AirbyteMessage(type=Type.SPEC, spec=spec) - def _run_check(self, config_path: str) -> AirbyteMessage: - config = self.read_config(config_path=config_path) + def _run_check(self, config: Mapping[str, Any]) -> AirbyteMessage: check_result = self.check(self.logger, config) return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result) def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]: - """ Reads from stdin, converting to Airbyte messages""" + """Reads from stdin, converting to Airbyte messages""" for line in input_stream: try: yield AirbyteMessage.parse_raw(line) except ValidationError: self.logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}") - def _run_write(self, config_path: str, configured_catalog_path: str, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]: - config = self.read_config(config_path=config_path) + def _run_write( + self, config: Mapping[str, Any], configured_catalog_path: str, input_stream: io.TextIOWrapper + ) -> Iterable[AirbyteMessage]: catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path) input_messages = self._parse_input_stream(input_stream) self.logger.info("Begin writing to the destination...") @@ -104,18 +106,24 @@ def parse_args(self, args: List[str]) -> argparse.Namespace: def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]: cmd = parsed_args.command + if cmd not in self.VALID_CMDS: + raise Exception(f"Unrecognized command: {cmd}") + + spec = self.spec(self.logger) if cmd == "spec": - yield self._run_spec() - elif cmd == "check": - yield self._run_check(config_path=parsed_args.config) + yield self._run_spec(spec) + return + config = self.read_config(config_path=parsed_args.config) + check_error_msg = check_config_against_spec(config, spec) + if check_error_msg: + self.logger.error(check_error_msg) + return + if cmd == "check": + yield self._run_check(config=config) elif cmd == "write": # Wrap in UTF-8 to override any other input encodings wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8") - yield from self._run_write( - config_path=parsed_args.config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin - ) - else: - raise Exception(f"Unrecognized command: {cmd}") + yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin) def run(self, args: List[str]): parsed_args = self.parse_args(args) diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index 6d714240c8f1..bddaf1663fd7 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -33,6 +33,7 @@ from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteMessage, Status, Type from airbyte_cdk.sources import Source +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec logger = AirbyteLogger() @@ -80,14 +81,19 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: raise Exception("No command passed") # todo: add try catch for exceptions with different exit codes + source_spec = self.source.spec(logger) with tempfile.TemporaryDirectory() as temp_dir: if cmd == "spec": - message = AirbyteMessage(type=Type.SPEC, spec=self.source.spec(logger)) + message = AirbyteMessage(type=Type.SPEC, spec=source_spec) yield message.json(exclude_unset=True) else: raw_config = self.source.read_config(parsed_args.config) config = self.source.configure(raw_config, temp_dir) + check_error_msg = check_config_against_spec(config, source_spec) + if check_error_msg: + logger.error(check_error_msg) + return if cmd == "check": check_result = self.source.check(logger, config) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index 5ee03697d3d2..cdd26ddc47de 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -26,10 +26,13 @@ import json import os import pkgutil -from typing import Dict +from typing import Any, Dict, Mapping, Union import pkg_resources -from jsonschema import RefResolver + +from airbyte_cdk.models import ConnectorSpecification +from jsonschema import RefResolver, validate +from jsonschema.exceptions import ValidationError class JsonSchemaResolver: @@ -124,3 +127,18 @@ def get_schema(self, name: str) -> dict: if os.path.exists(shared_schemas_folder): return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema) return raw_schema + + +def check_config_against_spec(config: Mapping[str, Any], spec: ConnectorSpecification) -> Union[str, None]: + """ + Check config object against spec. + :param config - config loaded from file specified over command line + :param spec - spec object generated by connector + :return Error message in case validation failed, None otherwise + """ + spec_schema = spec.connectionSpecification + try: + validate(instance=config, schema=spec_schema) + return None + except ValidationError as validation_error: + return "Config validation error: " + validation_error.message diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index b554ccf9fbb3..693fe163c3b4 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.8", + version="0.1.10", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/destinations/test_destination.py b/airbyte-cdk/python/unit_tests/destinations/test_destination.py index c2438f0e0a74..cb04eca0d0e3 100644 --- a/airbyte-cdk/python/unit_tests/destinations/test_destination.py +++ b/airbyte-cdk/python/unit_tests/destinations/test_destination.py @@ -180,6 +180,7 @@ def test_run_check(self, mocker, destination: Destination, tmp_path): parsed_args = argparse.Namespace(**args) destination.run_cmd(parsed_args) + mocker.patch.object(destination, "spec", return_value=ConnectorSpecification(connectionSpecification={})) expected_check_result = AirbyteConnectionStatus(status=Status.SUCCEEDED) mocker.patch.object(destination, "check", return_value=expected_check_result, autospec=True) @@ -216,6 +217,7 @@ def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch mocker.patch.object( destination, "write", return_value=iter(expected_write_result), autospec=True # convert to iterator to mimic real usage ) + mocker.patch.object(destination, "spec", return_value=ConnectorSpecification(connectionSpecification={})) # mock input is a record followed by some state messages mocked_input: List[AirbyteMessage] = [_wrapped(_record("s1", {"k1": "v1"})), *expected_write_result] mocked_stdin_string = "\n".join([record.json(exclude_unset=True) for record in mocked_input]) diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index cfdd10a5c5e1..88de829486f2 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -100,7 +100,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, def test_next_page_token_is_input_to_other_methods(mocker): - """ Validates that the return value from next_page_token is passed into other methods that need it like request_params, headers, body, etc..""" + """Validates that the return value from next_page_token is passed into other methods that need it like request_params, headers, body, etc..""" pages = 5 stream = StubNextPageTokenHttpStream(pages=pages) blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway. @@ -144,6 +144,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: def test_stub_custom_backoff_http_stream(mocker): + mocker.patch("time.sleep", lambda x: None) stream = StubCustomBackoffHttpStream() req = requests.Response() req.status_code = 429 diff --git a/airbyte-cdk/python/unit_tests/test_entrypoint.py b/airbyte-cdk/python/unit_tests/test_entrypoint.py index 2992baadd016..3310af1fc482 100644 --- a/airbyte-cdk/python/unit_tests/test_entrypoint.py +++ b/airbyte-cdk/python/unit_tests/test_entrypoint.py @@ -26,6 +26,7 @@ from argparse import Namespace from copy import deepcopy from typing import Any, List, Mapping, MutableMapping, Union +from unittest.mock import MagicMock import pytest from airbyte_cdk import AirbyteEntrypoint @@ -61,6 +62,14 @@ def _as_arglist(cmd: str, named_args: Mapping[str, Any]) -> List[str]: return out +@pytest.fixture +def spec_mock(mocker): + expected = ConnectorSpecification(connectionSpecification={}) + mock = MagicMock(return_value=expected) + mocker.patch.object(MockSource, "spec", mock) + return mock + + @pytest.fixture def entrypoint() -> AirbyteEntrypoint: return AirbyteEntrypoint(MockSource()) @@ -121,40 +130,61 @@ def test_run_spec(entrypoint: AirbyteEntrypoint, mocker): assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args)) -def test_run_check(entrypoint: AirbyteEntrypoint, mocker): - parsed_args = Namespace(command="check", config="config_path") - config = {"username": "fake"} - check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED) +@pytest.fixture +def config_mock(mocker, request): + config = request.param if hasattr(request, "param") else {"username": "fake"} mocker.patch.object(MockSource, "read_config", return_value=config) mocker.patch.object(MockSource, "configure", return_value=config) + return config + + +@pytest.mark.parametrize( + "config_mock, schema, config_valid", + [ + ({"username": "fake"}, {"type": "object", "properties": {"name": {"type": "string"}}, "additionalProperties": False}, False), + ({"username": "fake"}, {"type": "object", "properties": {"username": {"type": "string"}}, "additionalProperties": False}, True), + ({"username": "fake"}, {"type": "object", "properties": {"user": {"type": "string"}}}, True), + ], + indirect=["config_mock"], +) +def test_config_validate(entrypoint: AirbyteEntrypoint, mocker, config_mock, schema, config_valid): + parsed_args = Namespace(command="check", config="config_path") + check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED) + mocker.patch.object(MockSource, "check", return_value=check_value) + mocker.patch.object(MockSource, "spec", return_value=ConnectorSpecification(connectionSpecification=schema)) + messages = list(entrypoint.run(parsed_args)) + if config_valid: + assert [_wrap_message(check_value)] == messages + else: + assert len(messages) == 0 + + +def test_run_check(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock): + parsed_args = Namespace(command="check", config="config_path") + check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED) mocker.patch.object(MockSource, "check", return_value=check_value) assert [_wrap_message(check_value)] == list(entrypoint.run(parsed_args)) + assert spec_mock.called -def test_run_discover(entrypoint: AirbyteEntrypoint, mocker): +def test_run_discover(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock): parsed_args = Namespace(command="discover", config="config_path") - config = {"username": "fake"} expected = AirbyteCatalog(streams=[AirbyteStream(name="stream", json_schema={"k": "v"})]) - mocker.patch.object(MockSource, "read_config", return_value=config) - mocker.patch.object(MockSource, "configure", return_value=config) mocker.patch.object(MockSource, "discover", return_value=expected) assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args)) + assert spec_mock.called -def test_run_read(entrypoint: AirbyteEntrypoint, mocker): +def test_run_read(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock): parsed_args = Namespace(command="read", config="config_path", state="statepath", catalog="catalogpath") - config = {"username": "fake"} expected = AirbyteRecordMessage(stream="stream", data={"data": "stuff"}, emitted_at=1) - mocker.patch.object(MockSource, "read_config", return_value=config) - mocker.patch.object(MockSource, "configure", return_value=config) mocker.patch.object(MockSource, "read_state", return_value={}) mocker.patch.object(MockSource, "read_catalog", return_value={}) mocker.patch.object(MockSource, "read", return_value=[AirbyteMessage(record=expected, type=Type.RECORD)]) assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args)) + assert spec_mock.called -def test_invalid_command(entrypoint: AirbyteEntrypoint, mocker): +def test_invalid_command(entrypoint: AirbyteEntrypoint, mocker, config_mock): with pytest.raises(Exception): - mocker.patch.object(MockSource, "read_config", return_value={}) - mocker.patch.object(MockSource, "configure", return_value={}) list(entrypoint.run(Namespace(command="invalid", config="conf"))) From e3d52796ef02772964609d82d8f9ba55dab91353 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Tue, 17 Aug 2021 15:28:28 +0300 Subject: [PATCH 07/10] Fix typo --- airbyte-cdk/python/CHANGELOG.md | 2 +- airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 63e18883e50d..bcc0560bac2a 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## 0.1.10 -Add checking specified config againt spec for read, write, check and ddiscover commands +Add checking specified config againt spec for read, write, check and discover commands ##0.1.9 remove this line after rebase diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index cdd26ddc47de..463cc4f3c38b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -139,6 +139,6 @@ def check_config_against_spec(config: Mapping[str, Any], spec: ConnectorSpecific spec_schema = spec.connectionSpecification try: validate(instance=config, schema=spec_schema) - return None + return except ValidationError as validation_error: return "Config validation error: " + validation_error.message From c97588beb50e57c63173d94e70f488b51faf9e16 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Wed, 18 Aug 2021 10:48:01 +0300 Subject: [PATCH 08/10] fix review comments --- .../airbyte_cdk/destinations/destination.py | 15 +++++---------- airbyte-cdk/python/airbyte_cdk/entrypoint.py | 7 ++----- .../airbyte_cdk/sources/utils/schema_helpers.py | 15 +++++++++------ airbyte-cdk/python/unit_tests/test_entrypoint.py | 6 ++++-- 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/destinations/destination.py b/airbyte-cdk/python/airbyte_cdk/destinations/destination.py index 2e0e597a8bf8..05f9a8615224 100644 --- a/airbyte-cdk/python/airbyte_cdk/destinations/destination.py +++ b/airbyte-cdk/python/airbyte_cdk/destinations/destination.py @@ -30,8 +30,8 @@ from airbyte_cdk import AirbyteLogger from airbyte_cdk.connector import Connector -from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Type -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec +from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit from pydantic import ValidationError @@ -45,9 +45,6 @@ def write( ) -> Iterable[AirbyteMessage]: """Implement to define how the connector writes data to the destination""" - def _run_spec(self, spec: ConnectorSpecification) -> AirbyteMessage: - return AirbyteMessage(type=Type.SPEC, spec=spec) - def _run_check(self, config: Mapping[str, Any]) -> AirbyteMessage: check_result = self.check(self.logger, config) return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result) @@ -111,13 +108,11 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]: spec = self.spec(self.logger) if cmd == "spec": - yield self._run_spec(spec) + yield AirbyteMessage(type=Type.SPEC, spec=spec) return config = self.read_config(config_path=parsed_args.config) - check_error_msg = check_config_against_spec(config, spec) - if check_error_msg: - self.logger.error(check_error_msg) - return + check_config_against_spec_or_exit(config, spec, self.logger) + if cmd == "check": yield self._run_check(config=config) elif cmd == "write": diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index bddaf1663fd7..316d1fbba8ed 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -33,7 +33,7 @@ from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteMessage, Status, Type from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit logger = AirbyteLogger() @@ -90,10 +90,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: else: raw_config = self.source.read_config(parsed_args.config) config = self.source.configure(raw_config, temp_dir) - check_error_msg = check_config_against_spec(config, source_spec) - if check_error_msg: - logger.error(check_error_msg) - return + check_config_against_spec_or_exit(config, source_spec, logger) if cmd == "check": check_result = self.source.check(logger, config) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index 463cc4f3c38b..3062cb7c24f2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -26,7 +26,8 @@ import json import os import pkgutil -from typing import Any, Dict, Mapping, Union +import sys +from typing import Any, Dict, Mapping import pkg_resources @@ -129,16 +130,18 @@ def get_schema(self, name: str) -> dict: return raw_schema -def check_config_against_spec(config: Mapping[str, Any], spec: ConnectorSpecification) -> Union[str, None]: +def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger): """ - Check config object against spec. + Check config object against spec. In case of spec is invalid, throws + SystemExit exception causeing application to make system exit call with + errorcode 1 :param config - config loaded from file specified over command line :param spec - spec object generated by connector - :return Error message in case validation failed, None otherwise + :param logger - Airbyte logger for reporting validation error """ spec_schema = spec.connectionSpecification try: validate(instance=config, schema=spec_schema) - return except ValidationError as validation_error: - return "Config validation error: " + validation_error.message + logger.error("Config validation error: " + validation_error.message) + sys.exit(1) diff --git a/airbyte-cdk/python/unit_tests/test_entrypoint.py b/airbyte-cdk/python/unit_tests/test_entrypoint.py index 3310af1fc482..efe1fef51628 100644 --- a/airbyte-cdk/python/unit_tests/test_entrypoint.py +++ b/airbyte-cdk/python/unit_tests/test_entrypoint.py @@ -152,11 +152,13 @@ def test_config_validate(entrypoint: AirbyteEntrypoint, mocker, config_mock, sch check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED) mocker.patch.object(MockSource, "check", return_value=check_value) mocker.patch.object(MockSource, "spec", return_value=ConnectorSpecification(connectionSpecification=schema)) - messages = list(entrypoint.run(parsed_args)) if config_valid: + messages = list(entrypoint.run(parsed_args)) assert [_wrap_message(check_value)] == messages else: - assert len(messages) == 0 + with pytest.raises(SystemExit) as ex_info: + list(entrypoint.run(parsed_args)) + assert ex_info.value.code == 1 def test_run_check(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock): From 289681651480befb9da75a963f28ead5f93a46c9 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Wed, 18 Aug 2021 10:48:44 +0300 Subject: [PATCH 09/10] fix review comments --- airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index 3062cb7c24f2..85726f11a38a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -27,6 +27,7 @@ import os import pkgutil import sys +from logging import Logger from typing import Any, Dict, Mapping import pkg_resources @@ -130,7 +131,7 @@ def get_schema(self, name: str) -> dict: return raw_schema -def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger): +def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: Logger): """ Check config object against spec. In case of spec is invalid, throws SystemExit exception causeing application to make system exit call with From e29552ecddb608573bf9719d1dec30638e909a10 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Thu, 19 Aug 2021 12:56:02 +0300 Subject: [PATCH 10/10] fix review comment --- .../airbyte_cdk/sources/utils/schema_helpers.py | 6 +++--- .../unit_tests/destinations/test_destination.py | 13 ++++++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index 85726f11a38a..b33fb8b76afa 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -27,11 +27,11 @@ import os import pkgutil import sys -from logging import Logger from typing import Any, Dict, Mapping import pkg_resources +from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ConnectorSpecification from jsonschema import RefResolver, validate from jsonschema.exceptions import ValidationError @@ -131,10 +131,10 @@ def get_schema(self, name: str) -> dict: return raw_schema -def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: Logger): +def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: AirbyteLogger): """ Check config object against spec. In case of spec is invalid, throws - SystemExit exception causeing application to make system exit call with + SystemExit exception causing application to make system exit call with errorcode 1 :param config - config loaded from file specified over command line :param spec - spec object generated by connector diff --git a/airbyte-cdk/python/unit_tests/destinations/test_destination.py b/airbyte-cdk/python/unit_tests/destinations/test_destination.py index cb04eca0d0e3..a9d604aa4288 100644 --- a/airbyte-cdk/python/unit_tests/destinations/test_destination.py +++ b/airbyte-cdk/python/unit_tests/destinations/test_destination.py @@ -179,8 +179,9 @@ def test_run_check(self, mocker, destination: Destination, tmp_path): parsed_args = argparse.Namespace(**args) destination.run_cmd(parsed_args) - - mocker.patch.object(destination, "spec", return_value=ConnectorSpecification(connectionSpecification={})) + spec_msg = ConnectorSpecification(connectionSpecification={}) + mocker.patch.object(destination, "spec", return_value=spec_msg) + validate_mock = mocker.patch("airbyte_cdk.destinations.destination.check_config_against_spec_or_exit") expected_check_result = AirbyteConnectionStatus(status=Status.SUCCEEDED) mocker.patch.object(destination, "check", return_value=expected_check_result, autospec=True) @@ -190,6 +191,8 @@ def test_run_check(self, mocker, destination: Destination, tmp_path): destination.check.assert_called_once() # type: ignore # Affirm to Mypy that this is indeed a method on this mock destination.check.assert_called_with(logger=ANY, config=dummy_config) # type: ignore + # Check if config validation has been called + validate_mock.assert_called_with(dummy_config, spec_msg, destination.logger) # verify output was correct assert _wrapped(expected_check_result) == returned_check_result @@ -217,7 +220,9 @@ def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch mocker.patch.object( destination, "write", return_value=iter(expected_write_result), autospec=True # convert to iterator to mimic real usage ) - mocker.patch.object(destination, "spec", return_value=ConnectorSpecification(connectionSpecification={})) + spec_msg = ConnectorSpecification(connectionSpecification={}) + mocker.patch.object(destination, "spec", return_value=spec_msg) + validate_mock = mocker.patch("airbyte_cdk.destinations.destination.check_config_against_spec_or_exit") # mock input is a record followed by some state messages mocked_input: List[AirbyteMessage] = [_wrapped(_record("s1", {"k1": "v1"})), *expected_write_result] mocked_stdin_string = "\n".join([record.json(exclude_unset=True) for record in mocked_input]) @@ -238,6 +243,8 @@ def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch # that iterates over two iterables to check equality input_messages=OrderedIterableMatcher(mocked_input), ) + # Check if config validation has been called + validate_mock.assert_called_with(dummy_config, spec_msg, destination.logger) # verify output was correct assert expected_write_result == returned_write_result