From 9043b671b6744c0a5e2b213f6be8c063299f59d9 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Tue, 6 Dec 2022 13:41:18 -0500 Subject: [PATCH] [ISSUE #15628] apply lookback window on earliest datetime between start and cursor --- .../stream_slicers/datetime_stream_slicer.py | 25 ++++++------------ .../test_datetime_stream_slicer.py | 26 ++++++++++++++++--- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index 181ddc096d99..66bac76d9421 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -145,28 +145,19 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> kwargs = {"stream_state": stream_state} end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=self._timezone)) lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config, **kwargs) if self.lookback_window else "0d") - start_datetime = self.start_datetime.get_datetime(self.config, **kwargs) - lookback_delta - start_datetime = min(start_datetime, end_datetime) - if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state: - cursor_datetime = self.parse_date(stream_state[self.cursor_field.eval(self.config)]) - else: - cursor_datetime = start_datetime - - start_datetime = max(cursor_datetime, start_datetime) - state_cursor_value = stream_state.get(self.cursor_field.eval(self.config, stream_state=stream_state)) + earliest_possible_start_datetime = min(self.start_datetime.get_datetime(self.config, **kwargs), end_datetime) + cursor_datetime = self._calculate_cursor_datetime_from_state(stream_state) + start_datetime = max(earliest_possible_start_datetime, cursor_datetime) - lookback_delta - if state_cursor_value: - state_date = self.parse_date(state_cursor_value) - else: - state_date = None - if state_date: - # If the input_state's date is greater than start_datetime, the start of the time window is the state's next day - next_date = state_date + datetime.timedelta(days=1) - start_datetime = max(start_datetime, next_date) dates = self._partition_daterange(start_datetime, end_datetime, self._step) return dates + def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime: + if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state: + return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) + datetime.timedelta(days=1) + return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + def _format_datetime(self, dt: datetime.datetime): return self._parser.format(dt, self.datetime_format) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py index 1bee16057667..e86216c809c8 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py @@ -21,11 +21,15 @@ timezone = datetime.timezone.utc +class MockedNowDatetime(datetime.datetime): + @classmethod + def now(cls, tz=None): + return FAKE_NOW + + @pytest.fixture() def mock_datetime_now(monkeypatch): - datetime_mock = unittest.mock.MagicMock(wraps=datetime.datetime) - datetime_mock.now.return_value = FAKE_NOW - monkeypatch.setattr(datetime, "datetime", datetime_mock) + monkeypatch.setattr(datetime, "datetime", MockedNowDatetime) @pytest.mark.parametrize( @@ -286,6 +290,22 @@ def mock_datetime_now(monkeypatch): {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, ], ), + ( + "test_with_lookback_window_from_cursor", + {cursor_field: "2021-01-05T00:00:00.000000+0000"}, + MinMaxDatetime(datetime="2021-01-01T00:00:00.000000+0000", options={}), + MinMaxDatetime(datetime="2021-01-06T00:00:00.000000+0000", options={}), + "1d", + cursor_field, + "3d", + datetime_format, + [ + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"}, + {"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + ], + ), ( "test_with_lookback_window_defaults_to_0d", {"date": "2021-01-05"},