Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Amplitude: Fixed JSON Validator date-time validation #13373

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
- name: Amplitude
sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396
dockerRepository: airbyte/source-amplitude
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/sources/amplitude
icon: amplitude.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amplitude:0.1.7"
- dockerImage: "airbyte/source-amplitude:0.1.8"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/amplitude"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-amplitude
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-amplitude/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python -m pytest integration_tests -p integration_tests.acceptance
docker build . --no-cache -t airbyte/source-amplitude:dev \
&& python -m pytest -p source_acceptance_test.plugin
```
To run your integration tests with docker

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class IncrementalAmplitudeStream(AmplitudeStream, ABC):
base_params = {}
cursor_field = "date"
date_template = "%Y%m%d"
compare_date_template = None

def __init__(self, start_date: str, **kwargs):
super().__init__(**kwargs)
Expand All @@ -69,16 +70,39 @@ def time_interval(self) -> dict:
"""
pass

def _get_date_time_items_from_schema(self):
"""
Get all properties from schema with format: 'date-time'
"""
result = []
schema = self.get_json_schema()["properties"]
for key, value in schema.items():
if value.get("format") == "date-time":
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
result.append(key)
return result

bazarnov marked this conversation as resolved.
Show resolved Hide resolved
def _date_time_to_rfc3339(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Transform 'date-time' items to RFC3339 format
"""
date_time_fields = self._get_date_time_items_from_schema()
for item in record:
if item in date_time_fields:
record[item] = pendulum.parse(record[item]).to_rfc3339_string()
return record

def _get_end_date(self, current_date: pendulum, end_date: pendulum = pendulum.now()):
if current_date.add(**self.time_interval).date() < end_date.date():
end_date = current_date.add(**self.time_interval)
return end_date

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
latest_benchmark = latest_record[self.cursor_field]
if current_stream_state.get(self.cursor_field):
return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])}
return {self.cursor_field: latest_benchmark}
# save state value in source native format
if self.compare_date_template:
latest_state = pendulum.parse(latest_record[self.cursor_field]).strftime(self.compare_date_template)
else:
latest_state = latest_record[self.cursor_field]
return {self.cursor_field: max(latest_state, current_stream_state.get(self.cursor_field, ""))}

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
parsed = urlparse.urlparse(response.url)
Expand Down Expand Up @@ -113,7 +137,7 @@ def request_params(
class Events(IncrementalAmplitudeStream):
cursor_field = "event_time"
date_template = "%Y%m%dT%H"
compare_date_template = "%Y-%m-%d %H:%M:%S"
compare_date_template = "%Y-%m-%d %H:%M:%S.%f"
primary_key = "uuid"
state_checkpoint_interval = 1000
time_interval = {"days": 3}
Expand All @@ -125,7 +149,7 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
with zip_file.open(gzip_filename) as file:
for record in self._parse_zip_file(file):
if record[self.cursor_field] >= state_value:
yield record
yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339

def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]:
with gzip.open(zip_file) as file:
Expand Down Expand Up @@ -158,11 +182,9 @@ def read_records(
end = pendulum.parse(stream_slice["end"])
if start > end:
yield from []

# sometimes the API throws a 404 error for not obvious reasons, we have to handle it and log it.
# for example, if there is no data from the specified time period, a 404 exception is thrown
# https://developers.amplitude.com/docs/export-api#status-codes

try:
self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%dT%H')} - {end.strftime('%Y-%m-%dT%H')}")
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"properties": {
"date": {
"type": ["null", "string"],
"format": "date-time"
"format": "date"
},
"details": {
"type": ["null", "string"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,56 @@ def test_get_end_date(self, stream_cls, expected):
expected = now.strftime(stream.date_template)
assert stream._get_end_date(yesterday).strftime(stream.date_template) == expected

class TestEventsStream:
def test_parse_zip(self):
stream = Events(pendulum.now().isoformat())
expected = [{"id": 123}]
result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json"))
assert expected == result

def test_stream_slices(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now()
expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}]
assert expected == stream.stream_slices()

def test_request_params(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now().subtract(hours=6)
slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}
assert slice == stream.request_params(slice)

class TestEventsStream:
def test_parse_zip(self):
stream = Events(pendulum.now().isoformat())
expected = [{"id": 123}]
result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json"))
assert expected == result

def test_stream_slices(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now()
expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}]
assert expected == stream.stream_slices()

def test_request_params(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now().subtract(hours=6)
slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}
assert slice == stream.request_params(slice)

def test_get_updated_state(self):
stream = Events(pendulum.now().isoformat())
current_state = {"event_time": ""}
latest_record = {"event_time": "2021-05-27 11:59:53.710000"}
result = stream.get_updated_state(current_state, latest_record)
assert result == latest_record

def test_get_date_time_items_from_schema(self):
stream = Events(pendulum.now().isoformat())
expected = [
"server_received_time",
"event_time",
"processed_time",
"user_creation_time",
"client_upload_time",
"server_upload_time",
"client_event_time",
]
result = stream._get_date_time_items_from_schema()
assert result == expected

@pytest.mark.parametrize(
"record, expected",
[
({}, {}),
({"event_time": "2021-05-27 11:59:53.710000"}, {"event_time": "2021-05-27T11:59:53.710000+00:00"}),
],
ids=["empty_record", "transformed_record"],
)
def test_date_time_to_rfc3339(self, record, expected):
stream = Events(pendulum.now().isoformat())
result = stream._date_time_to_rfc3339(record)
assert result == expected
3 changes: 2 additions & 1 deletion docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ The Amplitude connector should gracefully handle Amplitude API limitations under

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :----------------------------------------------------- | :------ |
| 0.1.6 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records |
| 0.1.8 | 2022-06-01 | [13373](https://github.com/airbytehq/airbyte/pull/13373) | Fixed the issue when JSON Validator produces erros on `date-time` check |
| 0.1.7 | 2022-05-21 | [13074](https://github.com/airbytehq/airbyte/pull/13074) | Removed time offset for `Events` stream, which caused a lot of duplicated records |
| 0.1.6 | 2022-04-30 | [12500](https://github.com/airbytehq/airbyte/pull/12500) | Improve input configuration copy |
| 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error |
| 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
Expand Down