Skip to content

Commit

Permalink
low-code connectors: fix parse and format methods (#15326)
Browse files Browse the repository at this point in the history
* fix parse and format methods

* define constant

* remove timestamp magic keyword

* comment

* test for ci

* uncomment test

* use timestamp()

* Bump cdk version

* bump to 0.1.72
  • Loading branch information
girarda authored Aug 9, 2022
1 parent fd0b769 commit 9507d56
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 22 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.72
- Bugfix: Fix bug in DatetimeStreamSlicer's parsing method

## 0.1.71
- Refactor declarative package to dataclasses
- Bugfix: Requester header always converted to string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class MinMaxDatetime(JsonSchemaMixin):
min_date, then min_date is returned. If date is greater than max_date, then max_date is returned.
If neither, the input date is returned.
The timestamp format accepts the same format codes as datetime.strfptime, which are
all the format codes required by the 1989 C standard.
Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html
Attributes:
datetime (Union[InterpolatedString, str]): InterpolatedString or string representing the datetime in the format specified by `datetime_format`
datetime_format (str): Format of the datetime passed as argument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional
from typing import Any, Iterable, Mapping, Optional, Union

import dateutil
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
Expand Down Expand Up @@ -35,6 +34,10 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
For example, "1d" will produce windows of 1 day, and 2weeks windows of 2 weeks.
The timestamp format accepts the same format codes as datetime.strfptime, which are
all the format codes required by the 1989 C standard.
Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html
Attributes:
start_datetime (MinMaxDatetime): the datetime that determines the earliest record that should be synced
end_datetime (MinMaxDatetime): the datetime that determines the last record that should be synced
Expand Down Expand Up @@ -128,7 +131,7 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
"""
stream_state = stream_state or {}
kwargs = {"stream_state": stream_state}
end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=datetime.timezone.utc))
end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=self._timezone))
lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config, **kwargs) if self.lookback_window else "0d")
start_datetime = self.start_datetime.get_datetime(self.config, **kwargs) - lookback_delta
start_datetime = min(start_datetime, end_datetime)
Expand All @@ -148,8 +151,11 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
return dates

def _format_datetime(self, dt: datetime.datetime):
if self.datetime_format == "timestamp":
return dt.timestamp()
# strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on
# It's safer to use the timestamp() method than the %s directive
# See https://stackoverflow.com/a/4974930
if self.datetime_format == "%s":
return str(int(dt.timestamp()))
else:
return dt.strftime(self.datetime_format)

Expand All @@ -167,22 +173,11 @@ def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) -
cursor_date = self.parse_date(cursor_value or default_date)
return comparator(cursor_date, default_date)

def parse_date(self, date: Any) -> datetime:
if date and isinstance(date, str):
if self.is_int(date):
return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone)
else:
return dateutil.parser.parse(date).replace(tzinfo=self._timezone)
elif isinstance(date, int):
return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone)
return date

def is_int(self, s) -> bool:
try:
int(s)
return True
except ValueError:
return False
def parse_date(self, date: Union[str, datetime.datetime]) -> datetime.datetime:
if isinstance(date, str):
return datetime.datetime.strptime(str(date), self.datetime_format).replace(tzinfo=self._timezone)
else:
return date

@classmethod
def _parse_timedelta(cls, time_str):
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.71",
version="0.1.72",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,5 +445,61 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params,
assert expected_body_data == slicer.get_request_body_data(stream_slice=stream_slice)


@pytest.mark.parametrize(
"test_name, input_date, date_format, expected_output_date",
[
(
"test_parse_date_iso",
"2021-01-01T00:00:00.000000+0000",
"%Y-%m-%dT%H:%M:%S.%f%z",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
("test_parse_date_number", "20210101", "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
(
"test_parse_date_datetime",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
"%Y%m%d",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
],
)
def test_parse_date(test_name, input_date, date_format, expected_output_date):
slicer = DatetimeStreamSlicer(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", options={}),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", options={}),
step="1d",
cursor_field=InterpolatedString(cursor_field, options={}),
datetime_format=date_format,
lookback_window=InterpolatedString("0d", options={}),
config=config,
options={},
)
output_date = slicer.parse_date(input_date)
assert expected_output_date == output_date


@pytest.mark.parametrize(
"test_name, input_dt, datetimeformat, expected_output",
[
("test_format_timestamp", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%s", "1609459200"),
("test_format_string", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y-%m-%d", "2021-01-01"),
],
)
def test_format_datetime(test_name, input_dt, datetimeformat, expected_output):
slicer = DatetimeStreamSlicer(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", options={}),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", options={}),
step="1d",
cursor_field=InterpolatedString(cursor_field, options={}),
datetime_format=datetimeformat,
lookback_window=InterpolatedString("0d", options={}),
config=config,
options={},
)

output_date = slicer._format_datetime(input_dt)
assert expected_output == output_date


if __name__ == "__main__":
unittest.main()

0 comments on commit 9507d56

Please sign in to comment.