Skip to content

Commit

Permalink
🐞 CDK: properly emit state on empty slices when using iterators (#17296)
Browse files Browse the repository at this point in the history
* fix: emit state on empty slices iterator

* bump version / update changelog

* format
  • Loading branch information
pedroslopez authored Sep 28, 2022
1 parent 4d9c585 commit fe2238f
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 7 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.89
- Fix: properly emit state when a stream has empty slices, provided by an iterator

## 0.1.88
- Bugfix: Evaluate `response.text` only in debug mode

Expand Down
12 changes: 8 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,11 @@ def _read_incremental(
stream_state=stream_state,
)
logger.debug(f"Processing stream slices for {stream_name}", extra={"stream_slices": slices})

total_records_counter = 0
if not slices:
# Safety net to ensure we always emit at least one state message even if there are no slices
checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager)
yield checkpoint
has_slices = False
for _slice in slices:
has_slices = True
logger.debug("Processing stream slice", extra={"slice": _slice})
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
Expand All @@ -261,6 +260,11 @@ def _read_incremental(
if self._limit_reached(internal_config, total_records_counter):
return

if not has_slices:
# Safety net to ensure we always emit at least one state message even if there are no slices
checkpoint = self._checkpoint_state(stream_instance, stream_state, state_manager)
yield checkpoint

def _read_full_refresh(
self,
logger: logging.Logger,
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.88",
version="0.1.89",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
5 changes: 3 additions & 2 deletions airbyte-cdk/python/unit_tests/sources/test_abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ def test_with_slices(self, mocker, use_legacy, per_stream_enabled):
pytest.param(False, id="test_source_emits_state_as_per_stream_format"),
],
)
def test_no_slices(self, mocker, use_legacy, per_stream_enabled):
@pytest.mark.parametrize("slices", [pytest.param([], id="test_slices_as_list"), pytest.param(iter([]), id="test_slices_as_iterator")])
def test_no_slices(self, mocker, use_legacy, per_stream_enabled, slices):
"""
Tests that an incremental read returns at least one state messages even if no records were read:
1. outputs a state message after reading the entire stream
Expand All @@ -615,7 +616,7 @@ def test_no_slices(self, mocker, use_legacy, per_stream_enabled):
input_state = defaultdict(dict)
else:
input_state = []
slices = []

stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}]
state = {"cursor": "value"}
stream_1 = MockStreamWithState(
Expand Down

0 comments on commit fe2238f

Please sign in to comment.