From fe2238f3f74ba189731388409fcd38d0f01e742f Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Wed, 28 Sep 2022 02:12:04 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9E=20CDK:=20properly=20emit=20state?= =?UTF-8?q?=20on=20empty=20slices=20when=20using=20iterators=20(#17296)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: emit state on empty slices iterator * bump version / update changelog * format --- airbyte-cdk/python/CHANGELOG.md | 3 +++ .../python/airbyte_cdk/sources/abstract_source.py | 12 ++++++++---- airbyte-cdk/python/setup.py | 2 +- .../unit_tests/sources/test_abstract_source.py | 5 +++-- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index ccecde28ccff..b3878ed0e324 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.89 +- Fix: properly emit state when a stream has empty slices, provided by an iterator + ## 0.1.88 - Bugfix: Evaluate `response.text` only in debug mode diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 8612beffaba3..b416c29fa95a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -229,12 +229,11 @@ def _read_incremental( stream_state=stream_state, ) logger.debug(f"Processing stream slices for {stream_name}", extra={"stream_slices": slices}) + total_records_counter = 0 - if not slices: - # Safety net to ensure we always emit at least one state message even if there are no slices - checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager) - yield checkpoint + has_slices = False for _slice in slices: + has_slices = True logger.debug("Processing stream slice", extra={"slice": _slice}) records = stream_instance.read_records( sync_mode=SyncMode.incremental, @@ -261,6 +260,11 @@ def _read_incremental( if self._limit_reached(internal_config, total_records_counter): return + if not has_slices: + # Safety net to ensure we always emit at least one state message even if there are no slices + checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager) + yield checkpoint + def _read_full_refresh( self, logger: logging.Logger, diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 607956a65eab..c0a9eaee6119 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.88", + version="0.1.89", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index 42bf28450341..7696a058383f 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -606,7 +606,8 @@ def test_with_slices(self, mocker, use_legacy, per_stream_enabled): pytest.param(False, id="test_source_emits_state_as_per_stream_format"), ], ) - def test_no_slices(self, mocker, use_legacy, per_stream_enabled): + @pytest.mark.parametrize("slices", [pytest.param([], id="test_slices_as_list"), pytest.param(iter([]), id="test_slices_as_iterator")]) + def test_no_slices(self, mocker, use_legacy, per_stream_enabled, slices): """ Tests that an incremental read returns at least one state messages even if no records were read: 1. outputs a state message after reading the entire stream @@ -615,7 +616,7 @@ def test_no_slices(self, mocker, use_legacy, per_stream_enabled): input_state = defaultdict(dict) else: input_state = [] - slices = [] + stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}] state = {"cursor": "value"} stream_1 = MockStreamWithState(