Skip to content

Commit

Permalink
[ISSUE #20322] add datetime_granularity logic to DatetimeStreamSlicer… (
Browse files Browse the repository at this point in the history
#20717)

* [ISSUE #20322] add datetime_granularity logic to DatetimeStreamSlicer and migrate duration to ISO8601

* [ISSUE #20322] fix tests

* [ISSUE #20322] code review based on clnoll's comments and fixed tests

* [ISSUE #20322] fix flake8 error

* [ISSUE #20322] fix source tests

* [ISSUE #20322] fixing yet another error in source

* [ISSUE #20322] code review

* [ISSUE #20322] adding new sources using datetime slicer

* [ISSUE #20322] fixing source-datascope and increasing version

* [ISSUE #20322] regenerate component schema

* [ISSUE #20322] fixing source-datascope

* [ISSUE #20322] extra field error
  • Loading branch information
maxi297 authored Jan 6, 2023
1 parent b526aac commit 4f0aca5
Show file tree
Hide file tree
Showing 41 changed files with 305 additions and 241 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.18.0
Adding `cursor_granularity` to the declarative API of DatetimeStreamSlicer

## 0.17.0
Add utility class to infer schemas from real records

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ definitions:
- cursor_field
- end_datetime
- datetime_format
- cursor_granularity
- start_datetime
- step
properties:
Expand All @@ -340,6 +341,8 @@ definitions:
type: string
datetime_format:
type: string
cursor_granularity:
type: string
end_datetime:
anyOf:
- type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class DatetimeStreamSlicer(BaseModel):
type: Literal["DatetimeStreamSlicer"]
cursor_field: str
datetime_format: str
cursor_granularity: str
end_datetime: Union[str, MinMaxDatetime]
start_datetime: Union[str, MinMaxDatetime]
step: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import datetime
import re
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional, Union

Expand All @@ -16,7 +15,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
from dateutil.relativedelta import relativedelta
from isodate import Duration, parse_duration


@dataclass
Expand All @@ -27,16 +26,7 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
Given a start time, end time, a step function, and an optional lookback window,
the stream slicer will partition the date range from start time - lookback window to end time.
The step function is defined as a string of the form:
`"<number><unit>"`
where unit can be one of
- years, y
- months, m
- weeks, w
- days, d
For example, "1d" will produce windows of 1 day, and "2w" windows of 2 weeks.
The step function is defined as a string of the form ISO8601 duration
The timestamp format accepts the same format codes as datetime.strfptime, which are
all the format codes required by the 1989 C standard.
Expand All @@ -45,22 +35,24 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
Attributes:
start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
end_datetime (Union[MinMaxDatetime, str]): the datetime that determines the last record that should be synced
step (str): size of the timewindow
step (str): size of the timewindow (ISO8601 duration)
cursor_field (Union[InterpolatedString, str]): record's cursor field
datetime_format (str): format of the datetime
cursor_granularity (str): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one
config (Config): connection config
start_time_option (Optional[RequestOption]): request option for start time
end_time_option (Optional[RequestOption]): request option for end time
stream_state_field_start (Optional[str]): stream slice start time field
stream_state_field_end (Optional[str]): stream slice end time field
lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for
lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration)
"""

start_datetime: Union[MinMaxDatetime, str]
end_datetime: Union[MinMaxDatetime, str]
step: str
cursor_field: Union[InterpolatedString, str]
datetime_format: str
cursor_granularity: str
config: Config
options: InitVar[Mapping[str, Any]]
_cursor: dict = field(repr=False, default=None) # tracks current datetime
Expand All @@ -71,10 +63,6 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
stream_state_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None

timedelta_regex = re.compile(
r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$"
)

def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.start_datetime, MinMaxDatetime):
self.start_datetime = MinMaxDatetime(self.start_datetime, options)
Expand All @@ -85,6 +73,7 @@ def __post_init__(self, options: Mapping[str, Any]):
self._interpolation = JinjaInterpolation()

self._step = self._parse_timedelta(self.step)
self._cursor_granularity = self._parse_timedelta(self.cursor_granularity)
self.cursor_field = InterpolatedString.create(self.cursor_field, options=options)
self.stream_slice_field_start = InterpolatedString.create(self.stream_state_field_start or "start_time", options=options)
self.stream_slice_field_end = InterpolatedString.create(self.stream_state_field_end or "end_time", options=options)
Expand Down Expand Up @@ -144,7 +133,7 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
stream_state = stream_state or {}
kwargs = {"stream_state": stream_state}
end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=self._timezone))
lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config, **kwargs) if self.lookback_window else "0d")
lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config, **kwargs) if self.lookback_window else "P0D")

earliest_possible_start_datetime = min(self.start_datetime.get_datetime(self.config, **kwargs), end_datetime)
cursor_datetime = self._calculate_cursor_datetime_from_state(stream_state)
Expand All @@ -154,18 +143,18 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->

def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime:
if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) + datetime.timedelta(days=1)
return self.parse_date(stream_state[self.cursor_field.eval(self.config)])
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime):
return self._parser.format(dt, self.datetime_format)

def _partition_daterange(self, start, end, step: datetime.timedelta):
def _partition_daterange(self, start: datetime.datetime, end: datetime.datetime, step: Union[datetime.timedelta, Duration]):
start_field = self.stream_slice_field_start.eval(self.config)
end_field = self.stream_slice_field_end.eval(self.config)
dates = []
while start <= end:
end_date = self._get_date(start + step - datetime.timedelta(days=1), end, min)
end_date = self._get_date(start + step - self._cursor_granularity, end, min)
dates.append({start_field: self._format_datetime(start), end_field: self._format_datetime(end_date)})
start += step
return dates
Expand All @@ -178,19 +167,13 @@ def parse_date(self, date: str) -> datetime.datetime:
return self._parser.parse(date, self.datetime_format, self._timezone)

@classmethod
def _parse_timedelta(cls, time_str):
def _parse_timedelta(cls, time_str) -> Union[datetime.timedelta, Duration]:
"""
Parse a time string e.g. (2h13m) into a timedelta object.
Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699
:param time_str: A string identifying a duration. (eg. 2h13m)
:return relativedelta: A relativedelta object
:return Parses an ISO 8601 durations into datetime.timedelta or Duration objects.
"""
parts = cls.timedelta_regex.match(time_str)

assert parts is not None

time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return relativedelta(**time_params)
if not time_str:
return datetime.timedelta(0)
return parse_duration(time_str)

def get_request_params(
self,
Expand Down
3 changes: 2 additions & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.17.0",
version="0.18.0",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -48,6 +48,7 @@
# pinned to the last working version for us temporarily while we fix
"dataclasses-jsonschema==2.15.1",
"dpath~=2.0.1",
"isodate~=0.6.1",
"jsonschema~=3.2.0",
"jsonref~=0.2",
"pendulum",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def test_simple_retriever_with_request_response_logs(mock_http_stream):
paginator = MagicMock()
record_selector = MagicMock()
iterator = DatetimeStreamSlicer(
start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={}
start_datetime="", end_datetime="", step="P1D", cursor_field="id", datetime_format="", cursor_granularity="P1D", config={}, options={}
)

retriever = SimpleRetriever(
Expand Down Expand Up @@ -153,7 +153,7 @@ def test_simple_retriever_with_request_response_log_last_records(mock_http_strea
record_selector.select_records.return_value = request_response_logs
response = requests.Response()
iterator = DatetimeStreamSlicer(
start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={}
start_datetime="", end_datetime="", step="P1D", cursor_field="id", datetime_format="", cursor_granularity="P1D", config={}, options={}
)

retriever = SimpleRetriever(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@
DatetimeStreamSlicer(
start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d", options={}),
end_datetime=MinMaxDatetime(datetime="2021-01-03", datetime_format="%Y-%m-%d", options={}),
step="1d",
step="P1D",
cursor_field=InterpolatedString.create("", options={}),
datetime_format="%Y-%m-%d",
cursor_granularity="P1D",
config={},
options={},
),
Expand Down Expand Up @@ -87,9 +88,10 @@ def test_update_cursor(test_name, stream_slice, expected_state):
DatetimeStreamSlicer(
start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d", options={}),
end_datetime=MinMaxDatetime(datetime="2021-01-03", datetime_format="%Y-%m-%d", options={}),
step="1d",
step="P1D",
cursor_field=InterpolatedString(string="date", options={}),
datetime_format="%Y-%m-%d",
cursor_granularity="P1D",
config={},
options={},
),
Expand Down
Loading

0 comments on commit 4f0aca5

Please sign in to comment.