Skip to content

Commit

Permalink
🎉 Source Zendesk Support: use cursor-based pagination (#8622)
Browse files Browse the repository at this point in the history
* feat(zendesk): Add Brands and CustomRoles

* feat(zendesk): add incremental unsorted cursor stream
implement IncrementalUnsortedCursorStream to ticket_metrics

* feat(zendesk): use sorted cursor pagination
for ticket comments and macros

* feat(zendesk): use unsorted cursor stream
for groups, group memberships and satisfaction ratings

* fix(zendesk): use safe method to get value from nested dict

* style(zendesk): reformat using gradlew

* fix(zendesk): format created_at and updated_at to date-time format

* feat(zendesk): add business hours schedule

* bump connector version

* bump dockerfile version

Co-authored-by: Marcos Marx <marcosmarxm@gmail.com>
  • Loading branch information
asyarif93 and marcosmarxm authored Dec 17, 2021
1 parent 1f580ed commit 2eace67
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "79c1aa37-dae3-42ae-b333-d1c105477715",
"name": "Zendesk Support",
"dockerRepository": "airbyte/source-zendesk-support",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/zendesk-support",
"icon": "zendesk.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@
- name: Zendesk Support
sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715
dockerRepository: airbyte/source-zendesk-support
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support
icon: zendesk.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7084,7 +7084,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-zendesk-support:0.1.9"
- dockerImage: "airbyte/source-zendesk-support:0.1.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-support"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-zendesk-support
Original file line number Diff line number Diff line change
Expand Up @@ -323,29 +323,40 @@ def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) ->
return params


class IncrementalUnsortedCursorStream(IncrementalUnsortedStream, ABC):
"""Stream for loading without sorting but with cursor based pagination"""

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
has_more = response.json().get("meta", {}).get("has_more")
if not has_more:
self._finished = True
return None
return response.json().get("meta", {}).get("after_cursor")

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(next_page_token=next_page_token, **kwargs)
params["page[size]"] = self.page_size
if next_page_token:
params["page[after]"] = next_page_token
return params


class FullRefreshStream(IncrementalUnsortedPageStream, ABC):
""" "Stream for endpoints where there are not any created_at or updated_at fields"""

# reset to default value
cursor_field = SourceZendeskSupportStream.cursor_field


class IncrementalSortedCursorStream(IncrementalUnsortedStream, ABC):
class IncrementalSortedCursorStream(IncrementalUnsortedCursorStream, ABC):
"""Stream for loading sorting data with cursor based pagination"""

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(next_page_token=next_page_token, **kwargs)
params.update({"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size})

if next_page_token:
params["cursor"] = next_page_token
def request_params(self, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(**kwargs)
if params:
params.update({"sort_by": self.cursor_field, "sort_order": "desc"})
return params

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self.is_finished:
return None
return response.json().get("before_cursor")


class IncrementalSortedPageStream(IncrementalUnsortedPageStream, ABC):
"""Stream for loading sorting data with normal pagination"""
Expand All @@ -357,7 +368,7 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]:
return params


class TicketComments(IncrementalSortedPageStream):
class TicketComments(IncrementalSortedCursorStream):
"""TicketComments stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_comments/
ZenDesk doesn't provide API for loading of all comments by one direct endpoints.
Thus at first we loads all updated tickets and after this tries to load all created/updated
Expand All @@ -368,7 +379,7 @@ class TicketComments(IncrementalSortedPageStream):
raise_on_http_errors = False

response_list_name = "comments"
cursor_field = IncrementalSortedPageStream.created_at_field
cursor_field = IncrementalSortedCursorStream.created_at_field

def __init__(self, **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -464,7 +475,8 @@ def parse_response(
# 2) pagination and sorting mechanism
# 3) cursor pagination and sorting mechanism
# 4) without sorting but with pagination
# 5) without created_at/updated_at fields
# 5) without sorting but with cursor pagination
# 6) without created_at/updated_at fields

# endpoints provide a built-in incremental approach

Expand Down Expand Up @@ -506,15 +518,15 @@ def get_last_end_time(self) -> Optional[Union[str, int]]:
# endpoints provide a pagination mechanism but we can't manage a response order


class Groups(IncrementalUnsortedPageStream):
class Groups(IncrementalUnsortedCursorStream):
"""Groups stream: https://developer.zendesk.com/api-reference/ticketing/groups/groups/"""


class GroupMemberships(IncrementalUnsortedPageStream):
class GroupMemberships(IncrementalUnsortedCursorStream):
"""GroupMemberships stream: https://developer.zendesk.com/api-reference/ticketing/groups/group_memberships/"""


class SatisfactionRatings(IncrementalUnsortedPageStream):
class SatisfactionRatings(IncrementalUnsortedCursorStream):
"""SatisfactionRatings stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/satisfaction_ratings/
The ZenDesk API for this stream provides the filter "start_time" that can be used for incremental logic
Expand Down Expand Up @@ -546,24 +558,32 @@ class TicketForms(IncrementalUnsortedPageStream):
"""TicketForms stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_forms/"""


class TicketMetrics(IncrementalUnsortedPageStream):
class TicketMetrics(IncrementalUnsortedCursorStream):
"""TicketMetric stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metrics/"""

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
# Tickets are ordered chronologically by created date, from newest to oldest.
# No need to get next page once cursor passed initial state
if self.is_finished:
return None

return super().next_page_token(response)


class TicketMetricEvents(IncrementalExportStream):
"""TicketMetricEvents stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metric_events/"""

cursor_field = "time"


class Macros(IncrementalSortedPageStream):
class Macros(IncrementalSortedCursorStream):
"""Macros stream: https://developer.zendesk.com/api-reference/ticketing/business-rules/macros/"""


# endpoints provide a cursor pagination and sorting mechanism


class TicketAudits(IncrementalSortedCursorStream):
class TicketAudits(IncrementalUnsortedStream):
"""TicketAudits stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_audits/"""

# can request a maximum of 1,000 results
Expand All @@ -574,6 +594,20 @@ class TicketAudits(IncrementalSortedCursorStream):
# Root of response is 'audits'. As rule as an endpoint name is equal a response list name
response_list_name = "audits"

# This endpoint uses a variant of cursor pagination with some differences from cursor pagination used in other endpoints.
def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(next_page_token=next_page_token, **kwargs)
params.update({"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size})

if next_page_token:
params["cursor"] = next_page_token
return params

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self.is_finished:
return None
return response.json().get("before_cursor")


# endpoints don't provide the updated_at/created_at fields
# thus we can't implement an incremental logic for them
Expand Down

0 comments on commit 2eace67

Please sign in to comment.