Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4776: Python CDK: Validate input config.py against spec #5457

Merged
merged 12 commits into from
Aug 19, 2021
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.1.10
Add checking specified config againt spec for read, write, check and ddiscover commands
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Add checking specified config againt spec for read, write, check and ddiscover commands
Add checking specified config againt spec for read, write, check and discover commands

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated


##0.1.9
remove this line after rebase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rudiment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, its cause Artur has his MR with 0.1.9 version published so my is 0.1.10 https://pypi.org/project/airbyte-cdk/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm waiting when he will be ready, rebase my changes onto his and update this file


## 0.1.8
Allow to fetch primary key info from singer catalog

Expand Down
40 changes: 24 additions & 16 deletions airbyte-cdk/python/airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,39 @@

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.connector import Connector
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Type
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec
from pydantic import ValidationError


class Destination(Connector, ABC):
logger = AirbyteLogger()
VALID_CMDS = {"spec", "check", "write"}

@abstractmethod
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""Implement to define how the connector writes data to the destination"""

def _run_spec(self) -> AirbyteMessage:
return AirbyteMessage(type=Type.SPEC, spec=self.spec(self.logger))
def _run_spec(self, spec: ConnectorSpecification) -> AirbyteMessage:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should delete this method, calling it _run_spec when it just wraps an object is misleading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

return AirbyteMessage(type=Type.SPEC, spec=spec)

def _run_check(self, config_path: str) -> AirbyteMessage:
config = self.read_config(config_path=config_path)
def _run_check(self, config: Mapping[str, Any]) -> AirbyteMessage:
check_result = self.check(self.logger, config)
return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result)

def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
""" Reads from stdin, converting to Airbyte messages"""
"""Reads from stdin, converting to Airbyte messages"""
for line in input_stream:
try:
yield AirbyteMessage.parse_raw(line)
except ValidationError:
self.logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}")

def _run_write(self, config_path: str, configured_catalog_path: str, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
config = self.read_config(config_path=config_path)
def _run_write(
self, config: Mapping[str, Any], configured_catalog_path: str, input_stream: io.TextIOWrapper
) -> Iterable[AirbyteMessage]:
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
input_messages = self._parse_input_stream(input_stream)
self.logger.info("Begin writing to the destination...")
Expand Down Expand Up @@ -104,18 +106,24 @@ def parse_args(self, args: List[str]) -> argparse.Namespace:

def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
cmd = parsed_args.command
if cmd not in self.VALID_CMDS:
raise Exception(f"Unrecognized command: {cmd}")

spec = self.spec(self.logger)
if cmd == "spec":
yield self._run_spec()
elif cmd == "check":
yield self._run_check(config_path=parsed_args.config)
yield self._run_spec(spec)
return
config = self.read_config(config_path=parsed_args.config)
check_error_msg = check_config_against_spec(config, spec)
if check_error_msg:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an invalid config should cause the exit code to be >0, it's invalid input. An exit code of 0 means the connector succeeded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

self.logger.error(check_error_msg)
return
if cmd == "check":
yield self._run_check(config=config)
elif cmd == "write":
# Wrap in UTF-8 to override any other input encodings
wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
yield from self._run_write(
config_path=parsed_args.config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin
)
else:
raise Exception(f"Unrecognized command: {cmd}")
yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)

def run(self, args: List[str]):
parsed_args = self.parse_args(args)
Expand Down
8 changes: 7 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, Status, Type
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec

logger = AirbyteLogger()

Expand Down Expand Up @@ -80,14 +81,19 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
raise Exception("No command passed")

# todo: add try catch for exceptions with different exit codes
source_spec = self.source.spec(logger)

with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=self.source.spec(logger))
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
yield message.json(exclude_unset=True)
else:
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)
check_error_msg = check_config_against_spec(config, source_spec)
if check_error_msg:
logger.error(check_error_msg)
return

if cmd == "check":
check_result = self.source.check(logger, config)
Expand Down
22 changes: 20 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import json
import os
import pkgutil
from typing import Dict
from typing import Any, Dict, Mapping, Union

import pkg_resources
from jsonschema import RefResolver

from airbyte_cdk.models import ConnectorSpecification
from jsonschema import RefResolver, validate
from jsonschema.exceptions import ValidationError


class JsonSchemaResolver:
Expand Down Expand Up @@ -124,3 +127,18 @@ def get_schema(self, name: str) -> dict:
if os.path.exists(shared_schemas_folder):
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
return raw_schema


def check_config_against_spec(config: Mapping[str, Any], spec: ConnectorSpecification) -> Union[str, None]:
"""
Check config object against spec.
:param config - config loaded from file specified over command line
:param spec - spec object generated by connector
:return Error message in case validation failed, None otherwise
"""
spec_schema = spec.connectionSpecification
try:
validate(instance=config, schema=spec_schema)
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return None
return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

except ValidationError as validation_error:
return "Config validation error: " + validation_error.message
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.8",
version="0.1.10",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'll need to either merge this with arthur's changes or release separately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ive merged Arthur's branch into mine so now this PR looks dirty before his branch will be merged on master.

description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def test_run_check(self, mocker, destination: Destination, tmp_path):
parsed_args = argparse.Namespace(**args)
destination.run_cmd(parsed_args)

mocker.patch.object(destination, "spec", return_value=ConnectorSpecification(connectionSpecification={}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a test case here too to verify the config is validated.

It may help to look at test coverage by running ./type_check_and_test.sh

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already detailed test case that covers validation function here: https://github.com/airbytehq/airbyte/pull/5457/files#diff-0d9087e19f5bdabe137529bb861aaaa261b39d5344e79041602a2a94c7530301R150 So Ive just added checks if this validation function is called with correct args for destination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done

expected_check_result = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(destination, "check", return_value=expected_check_result, autospec=True)

Expand Down Expand Up @@ -216,6 +217,7 @@ def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch
mocker.patch.object(
destination, "write", return_value=iter(expected_write_result), autospec=True # convert to iterator to mimic real usage
)
mocker.patch.object(destination, "spec", return_value=ConnectorSpecification(connectionSpecification={}))
# mock input is a record followed by some state messages
mocked_input: List[AirbyteMessage] = [_wrapped(_record("s1", {"k1": "v1"})), *expected_write_result]
mocked_stdin_string = "\n".join([record.json(exclude_unset=True) for record in mocked_input])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,


def test_next_page_token_is_input_to_other_methods(mocker):
""" Validates that the return value from next_page_token is passed into other methods that need it like request_params, headers, body, etc.."""
"""Validates that the return value from next_page_token is passed into other methods that need it like request_params, headers, body, etc.."""
pages = 5
stream = StubNextPageTokenHttpStream(pages=pages)
blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.
Expand Down Expand Up @@ -144,6 +144,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:


def test_stub_custom_backoff_http_stream(mocker):
mocker.patch("time.sleep", lambda x: None)
stream = StubCustomBackoffHttpStream()
req = requests.Response()
req.status_code = 429
Expand Down
60 changes: 45 additions & 15 deletions airbyte-cdk/python/unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from argparse import Namespace
from copy import deepcopy
from typing import Any, List, Mapping, MutableMapping, Union
from unittest.mock import MagicMock

import pytest
from airbyte_cdk import AirbyteEntrypoint
Expand Down Expand Up @@ -61,6 +62,14 @@ def _as_arglist(cmd: str, named_args: Mapping[str, Any]) -> List[str]:
return out


@pytest.fixture
def spec_mock(mocker):
expected = ConnectorSpecification(connectionSpecification={})
mock = MagicMock(return_value=expected)
mocker.patch.object(MockSource, "spec", mock)
return mock


@pytest.fixture
def entrypoint() -> AirbyteEntrypoint:
return AirbyteEntrypoint(MockSource())
Expand Down Expand Up @@ -121,40 +130,61 @@ def test_run_spec(entrypoint: AirbyteEntrypoint, mocker):
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))


def test_run_check(entrypoint: AirbyteEntrypoint, mocker):
parsed_args = Namespace(command="check", config="config_path")
config = {"username": "fake"}
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
@pytest.fixture
def config_mock(mocker, request):
config = request.param if hasattr(request, "param") else {"username": "fake"}
mocker.patch.object(MockSource, "read_config", return_value=config)
mocker.patch.object(MockSource, "configure", return_value=config)
return config


@pytest.mark.parametrize(
"config_mock, schema, config_valid",
[
({"username": "fake"}, {"type": "object", "properties": {"name": {"type": "string"}}, "additionalProperties": False}, False),
({"username": "fake"}, {"type": "object", "properties": {"username": {"type": "string"}}, "additionalProperties": False}, True),
({"username": "fake"}, {"type": "object", "properties": {"user": {"type": "string"}}}, True),
],
indirect=["config_mock"],
)
def test_config_validate(entrypoint: AirbyteEntrypoint, mocker, config_mock, schema, config_valid):
parsed_args = Namespace(command="check", config="config_path")
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(MockSource, "check", return_value=check_value)
mocker.patch.object(MockSource, "spec", return_value=ConnectorSpecification(connectionSpecification=schema))
messages = list(entrypoint.run(parsed_args))
if config_valid:
assert [_wrap_message(check_value)] == messages
else:
assert len(messages) == 0


def test_run_check(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="check", config="config_path")
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(MockSource, "check", return_value=check_value)
assert [_wrap_message(check_value)] == list(entrypoint.run(parsed_args))
assert spec_mock.called


def test_run_discover(entrypoint: AirbyteEntrypoint, mocker):
def test_run_discover(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="discover", config="config_path")
config = {"username": "fake"}
expected = AirbyteCatalog(streams=[AirbyteStream(name="stream", json_schema={"k": "v"})])
mocker.patch.object(MockSource, "read_config", return_value=config)
mocker.patch.object(MockSource, "configure", return_value=config)
mocker.patch.object(MockSource, "discover", return_value=expected)
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
assert spec_mock.called


def test_run_read(entrypoint: AirbyteEntrypoint, mocker):
def test_run_read(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="read", config="config_path", state="statepath", catalog="catalogpath")
config = {"username": "fake"}
expected = AirbyteRecordMessage(stream="stream", data={"data": "stuff"}, emitted_at=1)
mocker.patch.object(MockSource, "read_config", return_value=config)
mocker.patch.object(MockSource, "configure", return_value=config)
mocker.patch.object(MockSource, "read_state", return_value={})
mocker.patch.object(MockSource, "read_catalog", return_value={})
mocker.patch.object(MockSource, "read", return_value=[AirbyteMessage(record=expected, type=Type.RECORD)])
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
assert spec_mock.called


def test_invalid_command(entrypoint: AirbyteEntrypoint, mocker):
def test_invalid_command(entrypoint: AirbyteEntrypoint, mocker, config_mock):
with pytest.raises(Exception):
mocker.patch.object(MockSource, "read_config", return_value={})
mocker.patch.object(MockSource, "configure", return_value={})
list(entrypoint.run(Namespace(command="invalid", config="conf")))