Skip to content

Commit

Permalink
[ISSUE #15628] apply lookback window on earliest datetime between sta…
Browse files Browse the repository at this point in the history
…rt and cursor
  • Loading branch information
maxi297 committed Dec 6, 2022
1 parent 772a01a commit 9043b67
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,28 +145,19 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
kwargs = {"stream_state": stream_state}
end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=self._timezone))
lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config, **kwargs) if self.lookback_window else "0d")
start_datetime = self.start_datetime.get_datetime(self.config, **kwargs) - lookback_delta
start_datetime = min(start_datetime, end_datetime)
if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
cursor_datetime = self.parse_date(stream_state[self.cursor_field.eval(self.config)])
else:
cursor_datetime = start_datetime

start_datetime = max(cursor_datetime, start_datetime)

state_cursor_value = stream_state.get(self.cursor_field.eval(self.config, stream_state=stream_state))
earliest_possible_start_datetime = min(self.start_datetime.get_datetime(self.config, **kwargs), end_datetime)
cursor_datetime = self._calculate_cursor_datetime_from_state(stream_state)
start_datetime = max(earliest_possible_start_datetime, cursor_datetime) - lookback_delta

if state_cursor_value:
state_date = self.parse_date(state_cursor_value)
else:
state_date = None
if state_date:
# If the input_state's date is greater than start_datetime, the start of the time window is the state's next day
next_date = state_date + datetime.timedelta(days=1)
start_datetime = max(start_datetime, next_date)
dates = self._partition_daterange(start_datetime, end_datetime, self._step)
return dates

def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime:
if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) + datetime.timedelta(days=1)
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime):
return self._parser.format(dt, self.datetime_format)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
timezone = datetime.timezone.utc


class MockedNowDatetime(datetime.datetime):
@classmethod
def now(cls, tz=None):
return FAKE_NOW


@pytest.fixture()
def mock_datetime_now(monkeypatch):
datetime_mock = unittest.mock.MagicMock(wraps=datetime.datetime)
datetime_mock.now.return_value = FAKE_NOW
monkeypatch.setattr(datetime, "datetime", datetime_mock)
monkeypatch.setattr(datetime, "datetime", MockedNowDatetime)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -286,6 +290,22 @@ def mock_datetime_now(monkeypatch):
{"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"},
],
),
(
"test_with_lookback_window_from_cursor",
{cursor_field: "2021-01-05T00:00:00.000000+0000"},
MinMaxDatetime(datetime="2021-01-01T00:00:00.000000+0000", options={}),
MinMaxDatetime(datetime="2021-01-06T00:00:00.000000+0000", options={}),
"1d",
cursor_field,
"3d",
datetime_format,
[
{"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"},
{"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"},
{"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"},
{"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"},
],
),
(
"test_with_lookback_window_defaults_to_0d",
{"date": "2021-01-05"},
Expand Down

0 comments on commit 9043b67

Please sign in to comment.