Skip to content

Commit

Permalink
[airbytehq#17107] Fix propagation of $options field in arrays (airbyt…
Browse files Browse the repository at this point in the history
…ehq#17196)

* input can only be a string

* remove debug print

* cleanup tests

* fix in the factory

* Add a unit test

* fix for lists

* Update changelog

* Update changelog
  • Loading branch information
girarda authored and jhammarstedt committed Oct 31, 2022
1 parent 5eb5ef4 commit 06a01b5
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
4 changes: 4 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.1.92
- Low-code: Properly propagate $options to array items
- Low-code: Log request and response when running check operation in debug mode

## 0.1.91
- Low-code: Rename LimitPaginator to DefaultPaginator and move page_size field to PaginationStrategy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,11 @@ def build(self, class_or_class_name: Union[str, Type], config, instantiate: bool
class_ = self._get_class_from_fully_qualified_class_name(class_or_class_name)
else:
class_ = class_or_class_name

# create components in options before propagating them
if OPTIONS_STR in kwargs:
kwargs[OPTIONS_STR] = {
k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs[OPTIONS_STR].items()
}

updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}

if instantiate:
Expand Down Expand Up @@ -216,7 +214,7 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class, in
self._create_subcomponent(
key,
sub,
self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)),
kwargs,
config,
parent_class,
instantiate,
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.91",
version="0.1.92",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
60 changes: 54 additions & 6 deletions airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def test_full_config():
primary_key: "id"
extractor:
$ref: "*ref(extractor)"
field_pointer: ["result"]
field_pointer: ["{{ options['name'] }}"]
retriever:
$ref: "*ref(retriever)"
requester:
Expand Down Expand Up @@ -365,7 +365,7 @@ def test_full_config():
assert type(stream.retriever.record_selector) == RecordSelector
assert type(stream.retriever.record_selector.extractor.decoder) == JsonDecoder

assert [fp.eval(input_config) for fp in stream.retriever.record_selector.extractor.field_pointer] == ["result"]
assert [fp.eval(input_config) for fp in stream.retriever.record_selector.extractor.field_pointer] == ["lists"]
assert type(stream.retriever.record_selector.record_filter) == RecordFilter
assert stream.retriever.record_selector.record_filter._filter_interpolator.condition == "{{ record['id'] > stream_state['id'] }}"
assert stream.schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.json"
Expand All @@ -378,18 +378,24 @@ def test_full_config():
assert stream.retriever.requester.path.default == "marketing/lists"


def test_create_record_selector():
content = """
@pytest.mark.parametrize(
"test_name, record_selector, expected_runtime_selector",
[("test_static_record_selector", "result", "result"), ("test_options_record_selector", "{{ options['name'] }}", "lists")],
)
def test_create_record_selector(test_name, record_selector, expected_runtime_selector):
content = f"""
extractor:
type: DpathExtractor
selector:
$options:
name: "lists"
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
record_filter:
class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter
condition: "{{ record['id'] > stream_state['id'] }}"
extractor:
$ref: "*ref(extractor)"
field_pointer: ["result"]
field_pointer: ["{record_selector}"]
"""
config = parser.parse(content)

Expand All @@ -398,10 +404,52 @@ def test_create_record_selector():
selector = factory.create_component(config["selector"], input_config)()
assert isinstance(selector, RecordSelector)
assert isinstance(selector.extractor, DpathExtractor)
assert [fp.eval(input_config) for fp in selector.extractor.field_pointer] == ["result"]
assert [fp.eval(input_config) for fp in selector.extractor.field_pointer] == [expected_runtime_selector]
assert isinstance(selector.record_filter, RecordFilter)


@pytest.mark.parametrize(
"test_name, content, expected_field_pointer_value",
[
(
"test_option_in_selector",
"""
extractor:
type: DpathExtractor
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
"selector",
),
(
"test_option_in_extractor",
"""
extractor:
type: DpathExtractor
$options:
name: "extractor"
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
"extractor",
),
],
)
def test_options_propagation(test_name, content, expected_field_pointer_value):
config = parser.parse(content)

selector = factory.create_component(config["selector"], input_config, True)()
assert selector.extractor.field_pointer[0].eval(input_config) == expected_field_pointer_value


def test_create_requester():
content = """
requester:
Expand Down

0 comments on commit 06a01b5

Please sign in to comment.