From de3603b174c59c88ec8b2e973cfac9a05881db74 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 1 Jun 2022 14:30:02 +0300 Subject: [PATCH 1/7] fixed --- .../connectors/source-amplitude/Dockerfile | 2 +- .../source_amplitude/schemas/annotations.json | 2 +- .../source_amplitude/schemas/events.json | 21 +++++++------------ 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/connectors/source-amplitude/Dockerfile b/airbyte-integrations/connectors/source-amplitude/Dockerfile index 047cbc6e5f39..247936e15209 100644 --- a/airbyte-integrations/connectors/source-amplitude/Dockerfile +++ b/airbyte-integrations/connectors/source-amplitude/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/source-amplitude diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/annotations.json b/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/annotations.json index 012beba10fe5..c58f3073d47c 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/annotations.json +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/annotations.json @@ -4,7 +4,7 @@ "properties": { "date": { "type": ["null", "string"], - "format": "date-time" + "format": "date" }, "details": { "type": ["null", "string"] diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/events.json b/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/events.json index dcc2aa12b5fd..e4ce93589456 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/events.json +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/events.json @@ -3,8 +3,7 @@ "type": "object", "properties": { "server_received_time": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "app": { "type": ["null", "integer"] @@ -25,8 +24,7 @@ "type": ["null", "string"] }, "event_time": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "platform": { "type": ["null", "string"] @@ -38,12 +36,10 @@ "type": ["null", "integer"] }, "processed_time": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "user_creation_time": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "version_name": { "type": ["null", "string"] @@ -64,8 +60,7 @@ "type": ["null", "object"] }, "client_upload_time": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "$insert_id": { "type": ["null", "string"] @@ -92,8 +87,7 @@ "type": ["null", "number"] }, "server_upload_time": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "event_id": { "type": ["null", "integer"] @@ -153,8 +147,7 @@ "type": ["null", "string"] }, "client_event_time": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] } } } From 1498faab2ec2b783434a49a0991854555d788884 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 1 Jun 2022 14:34:01 +0300 Subject: [PATCH 2/7] updated readme --- airbyte-integrations/connectors/source-amplitude/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-amplitude/README.md b/airbyte-integrations/connectors/source-amplitude/README.md index fefa6fb4a013..6b321632d41d 100644 --- a/airbyte-integrations/connectors/source-amplitude/README.md +++ b/airbyte-integrations/connectors/source-amplitude/README.md @@ -99,7 +99,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. To run your integration tests with acceptance tests, from the connector root, run ``` -python -m pytest integration_tests -p integration_tests.acceptance +docker build . --no-cache -t airbyte/source-amplitude:dev \ +&& python -m pytest -p source_acceptance_test.plugin ``` To run your integration tests with docker From 33022df5032180955ff6aeeadb192acff7225dc2 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 1 Jun 2022 14:38:23 +0300 Subject: [PATCH 3/7] updated changelog --- docs/integrations/sources/amplitude.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/integrations/sources/amplitude.md b/docs/integrations/sources/amplitude.md index ee153d8c78af..4b452adfb0b8 100644 --- a/docs/integrations/sources/amplitude.md +++ b/docs/integrations/sources/amplitude.md @@ -59,7 +59,8 @@ The Amplitude connector should gracefully handle Amplitude API limitations under | Version | Date | Pull Request | Subject | |:--------| :--------- | :----------------------------------------------------- | :------ | -| 0.1.6 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records | +| 0.1.8 | 2022-06-01 | [13373](https://github.com/airbytehq/airbyte/pull/13373) | Fixed the issue when JSON Validator produces erros on `date-time` check | +| 0.1.7 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records | | 0.1.6 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy | | 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error | | 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications | From eb90e20331b8698777279da8e002a32b049f3ecf Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 2 Jun 2022 12:28:57 +0300 Subject: [PATCH 4/7] reverted schema change --- .../source_amplitude/schemas/events.json | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/events.json b/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/events.json index e4ce93589456..dcc2aa12b5fd 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/events.json +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/schemas/events.json @@ -3,7 +3,8 @@ "type": "object", "properties": { "server_received_time": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "app": { "type": ["null", "integer"] @@ -24,7 +25,8 @@ "type": ["null", "string"] }, "event_time": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "platform": { "type": ["null", "string"] @@ -36,10 +38,12 @@ "type": ["null", "integer"] }, "processed_time": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "user_creation_time": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "version_name": { "type": ["null", "string"] @@ -60,7 +64,8 @@ "type": ["null", "object"] }, "client_upload_time": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "$insert_id": { "type": ["null", "string"] @@ -87,7 +92,8 @@ "type": ["null", "number"] }, "server_upload_time": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "event_id": { "type": ["null", "integer"] @@ -147,7 +153,8 @@ "type": ["null", "string"] }, "client_event_time": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" } } } From aedcfed739de1cf9b7816161b2dfa4a4e486281d Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 2 Jun 2022 14:36:39 +0300 Subject: [PATCH 5/7] fixed --- .../source-amplitude/source_amplitude/api.py | 32 +++++++-- .../source-amplitude/unit_tests/test_api.py | 71 ++++++++++++++----- 2 files changed, 81 insertions(+), 22 deletions(-) diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py index d3222cca9829..b30ea42409b9 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py @@ -69,6 +69,27 @@ def time_interval(self) -> dict: """ pass + def _get_date_time_items_from_schema(self): + """ + Get all properties from schema with format: 'date-time' + """ + result = [] + schema = self.get_json_schema()["properties"] + for key, value in schema.items(): + if value.get("format") == "date-time": + result.append(key) + return result + + def _date_time_to_rfc3339(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + """ + Transform 'date-time' items to RFC3339 format + """ + date_time_fields = self._get_date_time_items_from_schema() + for item in record: + if item in date_time_fields: + record[item] = pendulum.parse(record[item]).to_rfc3339_string() + return record + def _get_end_date(self, current_date: pendulum, end_date: pendulum = pendulum.now()): if current_date.add(**self.time_interval).date() < end_date.date(): end_date = current_date.add(**self.time_interval) @@ -113,7 +134,7 @@ def request_params( class Events(IncrementalAmplitudeStream): cursor_field = "event_time" date_template = "%Y%m%dT%H" - compare_date_template = "%Y-%m-%d %H:%M:%S" + compare_date_template = "%Y-%m-%d %H:%M:%S.%f" primary_key = "uuid" state_checkpoint_interval = 1000 time_interval = {"days": 3} @@ -125,7 +146,12 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, with zip_file.open(gzip_filename) as file: for record in self._parse_zip_file(file): if record[self.cursor_field] >= state_value: - yield record + yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339 + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + # save state value in source native format + latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template) + return {self.cursor_field: max(latest_state, current_stream_state.get(self.cursor_field, ""))} def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]: with gzip.open(zip_file) as file: @@ -158,11 +184,9 @@ def read_records( end = pendulum.parse(stream_slice["end"]) if start > end: yield from [] - # sometimes the API throws a 404 error for not obvious reasons, we have to handle it and log it. # for example, if there is no data from the specified time period, a 404 exception is thrown # https://developers.amplitude.com/docs/export-api#status-codes - try: self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%dT%H')} - {end.strftime('%Y-%m-%dT%H')}") yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state) diff --git a/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py b/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py index 8c73b502f045..a60e533d6ec5 100644 --- a/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py +++ b/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py @@ -165,21 +165,56 @@ def test_get_end_date(self, stream_cls, expected): expected = now.strftime(stream.date_template) assert stream._get_end_date(yesterday).strftime(stream.date_template) == expected - class TestEventsStream: - def test_parse_zip(self): - stream = Events(pendulum.now().isoformat()) - expected = [{"id": 123}] - result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json")) - assert expected == result - - def test_stream_slices(self): - stream = Events(pendulum.now().isoformat()) - now = pendulum.now() - expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}] - assert expected == stream.stream_slices() - - def test_request_params(self): - stream = Events(pendulum.now().isoformat()) - now = pendulum.now().subtract(hours=6) - slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)} - assert slice == stream.request_params(slice) + +class TestEventsStream: + def test_parse_zip(self): + stream = Events(pendulum.now().isoformat()) + expected = [{"id": 123}] + result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json")) + assert expected == result + + def test_stream_slices(self): + stream = Events(pendulum.now().isoformat()) + now = pendulum.now() + expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}] + assert expected == stream.stream_slices() + + def test_request_params(self): + stream = Events(pendulum.now().isoformat()) + now = pendulum.now().subtract(hours=6) + slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)} + assert slice == stream.request_params(slice) + + def test_get_updated_state(self): + stream = Events(pendulum.now().isoformat()) + current_state = {"event_time": ""} + latest_record = {"event_time": "2021-05-27 11:59:53.710000"} + result = stream.get_updated_state(current_state, latest_record) + assert result == latest_record + + def test_get_date_time_items_from_schema(self): + stream = Events(pendulum.now().isoformat()) + expected = [ + "server_received_time", + "event_time", + "processed_time", + "user_creation_time", + "client_upload_time", + "server_upload_time", + "client_event_time", + ] + result = stream._get_date_time_items_from_schema() + assert result == expected + + @pytest.mark.parametrize( + "record, expected", + [ + ({}, {}), + ({"event_time": "2021-05-27 11:59:53.710000"}, {"event_time": "2021-05-27T11:59:53.710000+00:00"}), + ], + ids=["empty_record", "transformed_record"], + ) + def test_date_time_to_rfc3339(self, record, expected): + stream = Events(pendulum.now().isoformat()) + result = stream._date_time_to_rfc3339(record) + assert result == expected From 5a0ce03bc7c499e0e484a12dc3fe5908bc8d865f Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 3 Jun 2022 14:07:33 +0300 Subject: [PATCH 6/7] updated after review --- .../source-amplitude/source_amplitude/api.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py index b30ea42409b9..daac4eb408b2 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py @@ -56,6 +56,7 @@ class IncrementalAmplitudeStream(AmplitudeStream, ABC): base_params = {} cursor_field = "date" date_template = "%Y%m%d" + compare_date_template = None def __init__(self, start_date: str, **kwargs): super().__init__(**kwargs) @@ -96,10 +97,12 @@ def _get_end_date(self, current_date: pendulum, end_date: pendulum = pendulum.no return end_date def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - latest_benchmark = latest_record[self.cursor_field] - if current_stream_state.get(self.cursor_field): - return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])} - return {self.cursor_field: latest_benchmark} + # save state value in source native format + if self.compare_date_template: + latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template) + else: + latest_state = latest_record[self.cursor_field] + return {self.cursor_field: max(latest_state, current_stream_state.get(self.cursor_field, ""))} def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: parsed = urlparse.urlparse(response.url) @@ -148,11 +151,6 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, if record[self.cursor_field] >= state_value: yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339 - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - # save state value in source native format - latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template) - return {self.cursor_field: max(latest_state, current_stream_state.get(self.cursor_field, ""))} - def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]: with gzip.open(zip_file) as file: for record in file: From 98f3d2dc21926451d309fa07d15385df59fa507f Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Fri, 3 Jun 2022 14:39:22 +0000 Subject: [PATCH 7/7] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 4bb2d01bf487..009f5909aa78 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -40,7 +40,7 @@ - name: Amplitude sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396 dockerRepository: airbyte/source-amplitude - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.io/integrations/sources/amplitude icon: amplitude.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 13297bcff478..09a8d3eca86f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -547,7 +547,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-amplitude:0.1.7" +- dockerImage: "airbyte/source-amplitude:0.1.8" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/amplitude" connectionSpecification: