Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low-code cdk] break resolving reference preprocessing into its own class so it can be reused #19517

Merged
merged 8 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.9.4
Low-code: Fix reference resolution for connector builder

## 0.9.3
Low-code: Avoid duplicate HTTP query in `simple_retriever`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
Expand All @@ -47,7 +48,10 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
:param debug(bool): True if debug mode is enabled
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._source_config = source_config

evaluated_manifest = {}
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config, evaluated_manifest, "")
self._source_config = resolved_source_config
self._debug = debug
self._factory = DeclarativeComponentFactory()

Expand Down Expand Up @@ -135,8 +139,8 @@ def _stream_configs(self):

@staticmethod
def generate_schema() -> str:
expanded_source_definition = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_definition.json_schema()
expanded_source_manifest = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_manifest.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)

@staticmethod
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@
from copy import deepcopy
from typing import Any, Mapping, Tuple, Union

import yaml
from airbyte_cdk.sources.declarative.parsers.config_parser import ConnectionDefinitionParser
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
from airbyte_cdk.sources.declarative.types import ConnectionDefinition


class YamlParser(ConnectionDefinitionParser):
class ManifestReferenceResolver:
"""
Parses a Yaml string to a ConnectionDefinition

In addition to standard Yaml parsing, the input_string can contain references to values previously defined.
An incoming manifest can contain references to values previously defined.
This parser will dereference these values to produce a complete ConnectionDefinition.

References can be defined using a *ref(<arg>) string.
Expand Down Expand Up @@ -101,31 +96,20 @@ class YamlParser(ConnectionDefinitionParser):

ref_tag = "$ref"

def parse(self, connection_definition_str: str) -> ConnectionDefinition:
"""
Parses a yaml file and dereferences string in the form "*ref({reference)"
to {reference}
:param connection_definition_str: yaml string to parse
:return: The ConnectionDefinition parsed from connection_definition_str
"""
input_mapping = yaml.safe_load(connection_definition_str)
evaluated_definition = {}
return self._preprocess_dict(input_mapping, evaluated_definition, "")

def _preprocess_dict(self, input_mapping: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]):
def preprocess_manifest(self, manifest: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]):

"""
:param input_mapping: mapping produced by parsing yaml
:param manifest: incoming manifest that could have references to previously defined components
:param evaluated_mapping: mapping produced by dereferencing the content of input_mapping
:param path: curent path in configuration traversal
:return:
"""
d = {}
if self.ref_tag in input_mapping:
partial_ref_string = input_mapping[self.ref_tag]
if self.ref_tag in manifest:
partial_ref_string = manifest[self.ref_tag]
d = deepcopy(self._preprocess(partial_ref_string, evaluated_mapping, path))

for key, value in input_mapping.items():
for key, value in manifest.items():
if key == self.ref_tag:
continue
full_path = self._resolve_value(key, path)
Expand Down Expand Up @@ -180,7 +164,7 @@ def _preprocess(self, value, evaluated_config: Mapping[str, Any], path):
key = *key[:-1], split[0], ".".join(split[1:])
raise UndefinedReferenceException(path, ref_key)
elif isinstance(value, dict):
return self._preprocess_dict(value, evaluated_config, path)
return self.preprocess_manifest(value, evaluated_config, path)
elif type(value) == list:
evaluated_list = [
# pass in elem's path instead of the list's path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import pkgutil

import yaml
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.declarative.types import ConnectionDefinition


Expand All @@ -25,8 +25,18 @@ def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition:

yaml_config = pkgutil.get_data(package, path_to_yaml_file)
decoded_yaml = yaml_config.decode()
return YamlParser().parse(decoded_yaml)
return self._parse(decoded_yaml)

def _emit_manifest_debug_message(self, extra_args: dict):
extra_args["path_to_yaml"] = self._path_to_yaml
self.logger.debug("declarative source created from parsed YAML manifest", extra=extra_args)

@staticmethod
def _parse(connection_definition_str: str) -> ConnectionDefinition:
"""
Parses a yaml file into a manifest. Component references still exist in the manifest which will be
resolved during the creating of the DeclarativeSource.
:param connection_definition_str: yaml string to parse
:return: The ConnectionDefinition parsed from connection_definition_str
"""
return yaml.safe_load(connection_definition_str)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.9.3",
version="0.9.4",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pytest
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException

resolver = ManifestReferenceResolver()


def test_get_ref():
s = "*ref(limit)"
ref_key = resolver._get_ref_key(s)
assert ref_key == "limit"


def test_get_ref_no_ref():
s = "limit: 50"

ref_key = resolver._get_ref_key(s)
assert ref_key is None


def test_refer():
content = {
"limit": 50,
"limit_ref": "*ref(limit)"
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["limit_ref"] == 50


def test_refer_to_inner():
content = {
"dict": {
"limit": 50
},
"limit_ref": "*ref(dict.limit)"
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["limit_ref"] == 50


def test_refer_to_non_existant_struct():
content = {
"dict": {
"limit": 50
},
"limit_ref": "*ref(not_dict)"
}
with pytest.raises(UndefinedReferenceException):
resolver.preprocess_manifest(content, {}, "")


def test_refer_in_dict():
content = {
"limit": 50,
"offset_request_parameters": {
"offset": "{{ next_page_token['offset'] }}",
"limit": "*ref(limit)"
}
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"
assert config["offset_request_parameters"]["limit"] == 50


def test_refer_to_dict():
content = {
"limit": 50,
"offset_request_parameters": {
"offset": "{{ next_page_token['offset'] }}",
"limit": "*ref(limit)"
},
"offset_pagination_request_parameters": {
"class": "InterpolatedRequestParameterProvider",
"request_parameters": "*ref(offset_request_parameters)"
}
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["limit"] == 50
assert config["offset_request_parameters"]["limit"] == 50
assert len(config["offset_pagination_request_parameters"]) == 2
assert config["offset_pagination_request_parameters"]["request_parameters"]["limit"] == 50
assert config["offset_pagination_request_parameters"]["request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"


def test_refer_and_overwrite():
content = {
"limit": 50,
"custom_limit": 25,
"offset_request_parameters": {
"offset": "{{ next_page_token['offset'] }}",
"limit": "*ref(limit)"
},
"custom_request_parameters": {
"$ref": "*ref(offset_request_parameters)",
"limit": "*ref(custom_limit)"
}
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["offset_request_parameters"]["limit"] == 50
assert config["custom_request_parameters"]["limit"] == 25

assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"
assert config["custom_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"


def test_collision():
content = {
"example": {
"nested":{
"path": "first one",
"more_nested": {
"value": "found it!"
}
},
"nested.path": "uh oh",
},
"reference_to_nested_path": {
"$ref": "*ref(example.nested.path)"
},
"reference_to_nested_nested_value": {
"$ref": "*ref(example.nested.more_nested.value)"
}
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["example"]["nested"]["path"] == "first one"
assert config["example"]["nested.path"] == "uh oh"
assert config["reference_to_nested_path"] == "uh oh"
assert config["example"]["nested"]["more_nested"]["value"] == "found it!"
assert config["reference_to_nested_nested_value"] == "found it!"


def test_list():
content = {
"list": ["A", "B"],
"elem_ref": "*ref(list[0])"
}
config = resolver.preprocess_manifest(content, {}, "")
elem_ref = config["elem_ref"]
assert elem_ref == "A"
Loading