From 9507d56be9b73086eab812c84515c48d8bc91f5d Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 8 Aug 2022 19:02:02 -0700 Subject: [PATCH] low-code connectors: fix parse and format methods (#15326) * fix parse and format methods * define constant * remove timestamp magic keyword * comment * test for ci * uncomment test * use timestamp() * Bump cdk version * bump to 0.1.72 --- airbyte-cdk/python/CHANGELOG.md | 3 + .../declarative/datetime/min_max_datetime.py | 4 ++ .../stream_slicers/datetime_stream_slicer.py | 37 ++++++------ airbyte-cdk/python/setup.py | 2 +- .../test_datetime_stream_slicer.py | 56 +++++++++++++++++++ 5 files changed, 80 insertions(+), 22 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 6213e864784e..7f0fa5574a95 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.72 +- Bugfix: Fix bug in DatetimeStreamSlicer's parsing method + ## 0.1.71 - Refactor declarative package to dataclasses - Bugfix: Requester header always converted to string diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py index d58ca36631c7..0c4b5232cf69 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py @@ -17,6 +17,10 @@ class MinMaxDatetime(JsonSchemaMixin): min_date, then min_date is returned. If date is greater than max_date, then max_date is returned. If neither, the input date is returned. + The timestamp format accepts the same format codes as datetime.strfptime, which are + all the format codes required by the 1989 C standard. + Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html + Attributes: datetime (Union[InterpolatedString, str]): InterpolatedString or string representing the datetime in the format specified by `datetime_format` datetime_format (str): Format of the datetime passed as argument diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index 351c4be8b2d4..c81d11e85129 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -5,9 +5,8 @@ import datetime import re from dataclasses import InitVar, dataclass, field -from typing import Any, Iterable, Mapping, Optional +from typing import Any, Iterable, Mapping, Optional, Union -import dateutil from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString @@ -35,6 +34,10 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin): For example, "1d" will produce windows of 1 day, and 2weeks windows of 2 weeks. + The timestamp format accepts the same format codes as datetime.strfptime, which are + all the format codes required by the 1989 C standard. + Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html + Attributes: start_datetime (MinMaxDatetime): the datetime that determines the earliest record that should be synced end_datetime (MinMaxDatetime): the datetime that determines the last record that should be synced @@ -128,7 +131,7 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> """ stream_state = stream_state or {} kwargs = {"stream_state": stream_state} - end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=datetime.timezone.utc)) + end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=self._timezone)) lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config, **kwargs) if self.lookback_window else "0d") start_datetime = self.start_datetime.get_datetime(self.config, **kwargs) - lookback_delta start_datetime = min(start_datetime, end_datetime) @@ -148,8 +151,11 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> return dates def _format_datetime(self, dt: datetime.datetime): - if self.datetime_format == "timestamp": - return dt.timestamp() + # strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on + # It's safer to use the timestamp() method than the %s directive + # See https://stackoverflow.com/a/4974930 + if self.datetime_format == "%s": + return str(int(dt.timestamp())) else: return dt.strftime(self.datetime_format) @@ -167,22 +173,11 @@ def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) - cursor_date = self.parse_date(cursor_value or default_date) return comparator(cursor_date, default_date) - def parse_date(self, date: Any) -> datetime: - if date and isinstance(date, str): - if self.is_int(date): - return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone) - else: - return dateutil.parser.parse(date).replace(tzinfo=self._timezone) - elif isinstance(date, int): - return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone) - return date - - def is_int(self, s) -> bool: - try: - int(s) - return True - except ValueError: - return False + def parse_date(self, date: Union[str, datetime.datetime]) -> datetime.datetime: + if isinstance(date, str): + return datetime.datetime.strptime(str(date), self.datetime_format).replace(tzinfo=self._timezone) + else: + return date @classmethod def _parse_timedelta(cls, time_str): diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 960bb620b55b..ff3a17f01051 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.71", + version="0.1.72", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py index df1aa811ce11..e2321ad607f2 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py @@ -445,5 +445,61 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params, assert expected_body_data == slicer.get_request_body_data(stream_slice=stream_slice) +@pytest.mark.parametrize( + "test_name, input_date, date_format, expected_output_date", + [ + ( + "test_parse_date_iso", + "2021-01-01T00:00:00.000000+0000", + "%Y-%m-%dT%H:%M:%S.%f%z", + datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), + ), + ("test_parse_date_number", "20210101", "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)), + ( + "test_parse_date_datetime", + datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), + "%Y%m%d", + datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), + ), + ], +) +def test_parse_date(test_name, input_date, date_format, expected_output_date): + slicer = DatetimeStreamSlicer( + start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", options={}), + end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", options={}), + step="1d", + cursor_field=InterpolatedString(cursor_field, options={}), + datetime_format=date_format, + lookback_window=InterpolatedString("0d", options={}), + config=config, + options={}, + ) + output_date = slicer.parse_date(input_date) + assert expected_output_date == output_date + + +@pytest.mark.parametrize( + "test_name, input_dt, datetimeformat, expected_output", + [ + ("test_format_timestamp", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%s", "1609459200"), + ("test_format_string", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y-%m-%d", "2021-01-01"), + ], +) +def test_format_datetime(test_name, input_dt, datetimeformat, expected_output): + slicer = DatetimeStreamSlicer( + start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", options={}), + end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", options={}), + step="1d", + cursor_field=InterpolatedString(cursor_field, options={}), + datetime_format=datetimeformat, + lookback_window=InterpolatedString("0d", options={}), + config=config, + options={}, + ) + + output_date = slicer._format_datetime(input_dt) + assert expected_output == output_date + + if __name__ == "__main__": unittest.main()