diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 2df5255f8265..56264df727a7 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.69 +- AbstractSource emits a state message when reading incremental even if there were no stream slices to process. + ## 0.1.68 - Replace parse-time string interpolation with run-time interpolation in YAML-based sources diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 78199a181f92..e4585ae7c2fc 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -226,6 +226,10 @@ def _read_incremental( ) logger.debug(f"Processing stream slices for {stream_name}", extra={"stream_slices": slices}) total_records_counter = 0 + if not slices: + # Safety net to ensure we always emit at least one state message even if there are no slices + checkpoint = self._checkpoint_state(stream_instance, stream_instance.state, connector_state) + yield checkpoint for _slice in slices: logger.debug("Processing stream slice", extra={"slice": _slice}) records = stream_instance.read_records( diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index e538af4b91c1..87467b9c2f62 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.68", + version="0.1.69", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index 77b7e81a55dd..7bae0e7a163b 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -98,9 +98,13 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: class MockStreamWithState(MockStream): cursor_field = "cursor" + def __init__(self, inputs_and_mocked_outputs: List[Tuple[Mapping[str, Any], Iterable[Mapping[str, Any]]]], name: str, state=None): + super().__init__(inputs_and_mocked_outputs, name) + self._state = state + @property def state(self): - return {} + return self._state @state.setter def state(self, value): @@ -452,6 +456,74 @@ def test_with_slices(self, mocker): assert expected == messages + def test_no_slices(self, mocker): + """ + Tests that an incremental read returns at least one state messages even if no records were read: + 1. outputs a state message after reading the entire stream + """ + slices = [] + stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}] + state = {"cursor": "value"} + s1 = MockStreamWithState( + [ + ( + { + "sync_mode": SyncMode.incremental, + "stream_slice": s, + "stream_state": mocker.ANY, + }, + stream_output, + ) + for s in slices + ], + name="s1", + state=state, + ) + s2 = MockStreamWithState( + [ + ( + { + "sync_mode": SyncMode.incremental, + "stream_slice": s, + "stream_state": mocker.ANY, + }, + stream_output, + ) + for s in slices + ], + name="s2", + state=state, + ) + + mocker.patch.object(MockStreamWithState, "supports_incremental", return_value=True) + mocker.patch.object(MockStreamWithState, "get_json_schema", return_value={}) + mocker.patch.object(MockStreamWithState, "stream_slices", return_value=slices) + mocker.patch.object( + MockStreamWithState, + "state_checkpoint_interval", + new_callable=mocker.PropertyMock, + return_value=2, + ) + + src = MockSource(streams=[s1, s2]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(s1, SyncMode.incremental), + _configured_stream(s2, SyncMode.incremental), + ] + ) + + expected = [ + _state({"s1": state}), + _state({"s1": state, "s2": state}), + ] + + messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict)))) + + print(f"expected:\n{expected}") + print(f"messages:\n{messages}") + assert expected == messages + def test_with_slices_and_interval(self, mocker): """ Tests that an incremental read which uses slices and a checkpoint interval: