diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 3f236be093df..6ad5de1ce01c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -843,7 +843,7 @@ - name: Zendesk Support sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715 dockerRepository: airbyte/source-zendesk-support - dockerImageTag: 0.2.1 + dockerImageTag: 0.2.2 documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support icon: zendesk.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e8552cfb1ee3..2dcf7000209c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8905,7 +8905,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-zendesk-support:0.2.1" +- dockerImage: "airbyte/source-zendesk-support:0.2.2" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-support" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile index b542ce4cb4e9..03358e75d66f 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile +++ b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.1 +LABEL io.airbyte.version=0.2.2 LABEL io.airbyte.name=airbyte/source-zendesk-support diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_comments.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_comments.json index df3aa01c3bb6..4fb30ea5cb38 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_comments.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_comments.json @@ -4,6 +4,9 @@ "type": ["null", "string"], "format": "date-time" }, + "timestamp": { + "type": ["null", "integer"] + }, "body": { "type": ["null", "string"] }, @@ -16,6 +19,9 @@ "type": { "type": ["null", "string"] }, + "via_reference_id": { + "type": ["null", "integer"] + }, "html_body": { "type": ["null", "string"] }, diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 47509ac5b444..c35f75f72425 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -25,8 +25,9 @@ from requests.auth import AuthBase from requests_futures.sessions import PICKLE_ERROR, FuturesSession -DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" -LAST_END_TIME_KEY = "_last_end_time" +DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%SZ" +LAST_END_TIME_KEY: str = "_last_end_time" +END_OF_STREAM_KEY: str = "end_of_stream" class SourceZendeskException(Exception): @@ -114,6 +115,12 @@ def str2unixtime(str_dt: str) -> Optional[int]: dt = datetime.strptime(str_dt, DATETIME_FORMAT) return calendar.timegm(dt.utctimetuple()) + @staticmethod + def _parse_next_page_number(response: requests.Response) -> Optional[int]: + """Parses a response and tries to find next page number""" + next_page = response.json().get("next_page") + return dict(parse_qsl(urlparse(next_page).query)).get("page") if next_page else None + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: """try to select relevant data only""" @@ -149,6 +156,16 @@ def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None, **k self._session.auth = authenticator self.future_requests = deque() + @property + def url_base(self) -> str: + return f"https://{self._subdomain}.zendesk.com/api/v2/" + + def path(self, **kwargs): + return self.name + + def next_page_token(self, *args, **kwargs): + return None + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: latest_benchmark = latest_record[self.cursor_field] if current_stream_state.get(self.cursor_field): @@ -270,24 +287,6 @@ def read_records( else: yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) - @property - def url_base(self) -> str: - return f"https://{self._subdomain}.zendesk.com/api/v2/" - - @staticmethod - def _parse_next_page_number(response: requests.Response) -> Optional[int]: - """Parses a response and tries to find next page number""" - next_page = response.json().get("next_page") - if next_page: - return dict(parse_qsl(urlparse(next_page).query)).get("page") - return None - - def path(self, **kwargs): - return self.name - - def next_page_token(self, *args, **kwargs): - return None - class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream): """ @@ -305,14 +304,6 @@ def url_base(self) -> str: def path(self, **kwargs): return self.name - @staticmethod - def _parse_next_page_number(response: requests.Response) -> Optional[int]: - """Parses a response and tries to find next page number""" - next_page = response.json().get("next_page") - if next_page: - return dict(parse_qsl(urlparse(next_page).query)).get("page") - return None - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: next_page = self._parse_next_page_number(response) if not next_page: @@ -351,17 +342,74 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, self.prev_start_time = start_time return {self.cursor_field: start_time} - def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: + def request_params( + self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs + ) -> MutableMapping[str, Any]: next_page_token = next_page_token or {} + if stream_state: + # use the state value if exists + parsed_state = calendar.timegm(pendulum.parse(stream_state.get(self.cursor_field)).utctimetuple()) + else: + # for full-refresh use start_date + parsed_state = calendar.timegm(pendulum.parse(self._start_date).utctimetuple()) if self.cursor_field: - params = { - "start_time": next_page_token.get(self.cursor_field, calendar.timegm(pendulum.parse(self._start_date).utctimetuple())) - } + params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)} else: params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())} return params +class ZendeskSupportTicketEventsExportStream(SourceZendeskSupportCursorPaginationStream): + """Incremental Export from TicketEvents stream: + https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export + + @ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events" + @ param response_target_entity: nested property inside of `response_list_name`, default = "child_events" + @ param list_entities_from_event : the list of nested child_events entities to include from parent record + @ param sideload_param : parameter variable to include various information to child_events property + more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints + @ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc] + """ + + response_list_name: str = "ticket_events" + response_target_entity: str = "child_events" + list_entities_from_event: List[str] = None + sideload_param: str = None + event_type: str = None + + @property + def update_event_from_record(self) -> bool: + """Returns True/False based on list_entities_from_event property""" + return True if len(self.list_entities_from_event) > 0 else False + + def path(self, **kwargs) -> str: + return "incremental/ticket_events" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + Returns next_page_token based on `end_of_stream` parameter inside of response + """ + next_page_token = super().next_page_token(response) + return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token + + def request_params( + self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state, next_page_token, **kwargs) + if self.sideload_param: + params["include"] = self.sideload_param + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + for record in response.json().get(self.response_list_name, []): + for event in record.get(self.response_target_entity, []): + if event.get("event_type") == self.event_type: + if self.update_event_from_record: + for prop in self.list_entities_from_event: + event[prop] = record.get(prop) + yield event + + class Users(SourceZendeskSupportStream): """Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" @@ -385,32 +433,15 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]: return params -class TicketComments(SourceZendeskSupportStream): - """TicketComments stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_comments/ - ZenDesk doesn't provide API for loading of all comments by one direct endpoints. - Thus at first we loads all updated tickets and after this tries to load all created/updated - comments per every ticket""" - - # Tickets can be removed throughout synchronization. The ZendDesk API will return a response - # with 404 code if a ticket is not exists. But it shouldn't break loading of other comments. - # raise_on_http_errors = False +class TicketComments(ZendeskSupportTicketEventsExportStream): + """ + Fetch the TicketComments incrementaly from TicketEvents Export stream + """ - parent = Tickets cursor_field = "created_at" - - response_list_name = "comments" - - def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - ticket_id = stream_slice["id"] - return f"tickets/{ticket_id}/comments" - - def stream_slices( - self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None - ) -> Iterable[Optional[Mapping[str, Any]]]: - tickets_stream = self.parent(start_date=self._start_date, subdomain=self._subdomain, authenticator=self._session.auth) - for ticket in tickets_stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_state=stream_state): - if ticket["comment_count"]: - yield {"id": ticket["id"], "child_count": ticket["comment_count"]} + list_entities_from_event = ["via_reference_id", "ticket_id", "timestamp"] + sideload_param = "comment_events" + event_type = "Comment" class Groups(SourceZendeskSupportStream): diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py new file mode 100644 index 000000000000..902cd43ea885 --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py @@ -0,0 +1,120 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import calendar +from datetime import datetime +from urllib.parse import parse_qsl, urlparse + +import pendulum +import pytz +import requests +from source_zendesk_support.source import BasicApiTokenAuthenticator +from source_zendesk_support.streams import DATETIME_FORMAT, END_OF_STREAM_KEY, BaseSourceZendeskSupportStream, TicketComments + +# config +STREAM_ARGS = { + "subdomain": "test", + "start_date": "2022-01-27T00:00:00Z", + "authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"), +} + +DATETIME_STR = "2021-07-22T06:55:55Z" +DATETIME_FROM_STR = datetime.strptime(DATETIME_STR, DATETIME_FORMAT) +STREAM_URL = "https://subdomain.zendesk.com/api/v2/stream.json?&start_time=1647532987&page=1" +STREAM_RESPONSE: dict = { + "ticket_events": [ + { + "child_events": [ + { + "id": 99999, + "via": {}, + "via_reference_id": None, + "type": "Comment", + "author_id": 10, + "body": "test_comment", + "html_body": '