From c4cbae450a44240507d20f960882bca34b0c1edd Mon Sep 17 00:00:00 2001 From: danieldiamond Date: Tue, 19 Apr 2022 12:39:02 +1000 Subject: [PATCH 1/9] Add SourceZendeskUserExportStream --- .../source_zendesk_support/streams.py | 133 ++++++++++++++---- 1 file changed, 104 insertions(+), 29 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 965b5e710ce7..91a1192481a6 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -177,7 +177,8 @@ def get_api_records_count(self, stream_slice: Mapping[str, Any] = None, stream_s to then correctly generate the pagination parameters. """ - count_url = urljoin(self.url_base, f"{self.path(stream_state=stream_state, stream_slice=stream_slice)}/count.json") + count_url = urljoin( + self.url_base, f"{self.path(stream_state=stream_state, stream_slice=stream_slice)}/count.json") start_date = self._start_date params = {} @@ -198,23 +199,31 @@ def generate_future_requests( stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, ): - records_count = self.get_api_records_count(stream_slice=stream_slice, stream_state=stream_state) + records_count = self.get_api_records_count( + stream_slice=stream_slice, stream_state=stream_state) page_count = ceil(records_count / self.page_size) for page_number in range(1, page_count + 1): - params = self.request_params(stream_state=stream_state, stream_slice=stream_slice) + params = self.request_params( + stream_state=stream_state, stream_slice=stream_slice) params["page"] = page_number - request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice) + request_headers = self.request_headers( + stream_state=stream_state, stream_slice=stream_slice) request = self._create_prepared_request( - path=self.path(stream_state=stream_state, stream_slice=stream_slice), - headers=dict(request_headers, **self.authenticator.get_auth_header()), + path=self.path(stream_state=stream_state, + stream_slice=stream_slice), + headers=dict(request_headers, ** + self.authenticator.get_auth_header()), params=params, - json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice), - data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice), + json=self.request_body_json( + stream_state=stream_state, stream_slice=stream_slice), + data=self.request_body_data( + stream_state=stream_state, stream_slice=stream_slice), ) - request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice) + request_kwargs = self.request_kwargs( + stream_state=stream_state, stream_slice=stream_slice) self.future_requests.append( { "future": self._send_request(request, request_kwargs), @@ -226,7 +235,8 @@ def generate_future_requests( ) def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: - response: requests.Response = self._session.send_future(request, **request_kwargs) + response: requests.Response = self._session.send_future( + request, **request_kwargs) return response def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: @@ -241,7 +251,8 @@ def request_params( current_state = stream_state.get(self.cursor_field) if current_state and isinstance(current_state, str) and not current_state.isdigit(): current_state = self.str2unixtime(current_state) - start_time = current_state or calendar.timegm(pendulum.parse(self._start_date).utctimetuple()) + start_time = current_state or calendar.timegm( + pendulum.parse(self._start_date).utctimetuple()) # +1 because the API returns all records where generated_timestamp >= start_time now = calendar.timegm(datetime.now().utctimetuple()) @@ -259,7 +270,8 @@ def read_records( stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: - self.generate_future_requests(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) + self.generate_future_requests( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) while len(self.future_requests) > 0: item = self.future_requests.popleft() @@ -269,10 +281,12 @@ def read_records( if self.should_retry(response): backoff_time = self.backoff_time(response) if item["retries"] == self.max_retries: - raise DefaultBackoffException(request=item["request"], response=response) + raise DefaultBackoffException( + request=item["request"], response=response) else: if response.elapsed.total_seconds() < backoff_time: - time.sleep(backoff_time - response.elapsed.total_seconds()) + time.sleep(backoff_time - + response.elapsed.total_seconds()) self.future_requests.append( { @@ -289,8 +303,8 @@ def read_records( class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream): """ - # endpoints don't provide the updated_at/created_at fields - # thus we can't implement an incremental logic for them + Endpoints don't provide the updated_at/created_at fields + Thus we can't implement an incremental logic for them """ page_size = 100 @@ -324,7 +338,7 @@ def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> class SourceZendeskSupportCursorPaginationStream(SourceZendeskSupportFullRefreshStream): """ - # endpoints provide a cursor pagination and sorting mechanism + Endpoints provide a cursor pagination and sorting mechanism """ next_page_field = "next_page" @@ -332,12 +346,14 @@ class SourceZendeskSupportCursorPaginationStream(SourceZendeskSupportFullRefresh def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: # try to save maximum value of a cursor field - old_value = str((current_stream_state or {}).get(self.cursor_field, "")) + old_value = str((current_stream_state or {} + ).get(self.cursor_field, "")) new_value = str((latest_record or {}).get(self.cursor_field, "")) return {self.cursor_field: max(new_value, old_value)} def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time") + start_time = dict(parse_qsl(urlparse(response.json().get( + self.next_page_field), "").query)).get("start_time") if start_time != self.prev_start_time: self.prev_start_time = start_time return {self.cursor_field: int(start_time)} @@ -346,7 +362,8 @@ def check_stream_state(self, stream_state: Mapping[str, Any] = None): """ Returns the state value, if exists. Otherwise, returns user defined `Start Date`. """ - state = stream_state.get(self.cursor_field) or self._start_date if stream_state else self._start_date + state = stream_state.get( + self.cursor_field) or self._start_date if stream_state else self._start_date return calendar.timegm(pendulum.parse(state).utctimetuple()) def request_params( @@ -355,9 +372,11 @@ def request_params( next_page_token = next_page_token or {} parsed_state = self.check_stream_state(stream_state) if self.cursor_field: - params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)} + params = {"start_time": next_page_token.get( + self.cursor_field, parsed_state)} else: - params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())} + params = {"start_time": calendar.timegm( + pendulum.parse(self._start_date).utctimetuple())} return params @@ -382,7 +401,8 @@ def check_start_time_param(requested_start_time: int, value: int = 1): Returns: either close to now UNIX timestamp or previously requested UNIX timestamp. """ - now = calendar.timegm(pendulum.now().subtract(minutes=value).utctimetuple()) + now = calendar.timegm(pendulum.now().subtract( + minutes=value).utctimetuple()) return now if requested_start_time > now else requested_start_time def path(self, **kwargs) -> str: @@ -400,7 +420,8 @@ def request_params( ) -> MutableMapping[str, Any]: params = super().request_params(stream_state, next_page_token, **kwargs) # check "start_time" is not in the future - params["start_time"] = self.check_start_time_param(params["start_time"]) + params["start_time"] = self.check_start_time_param( + params["start_time"]) if self.sideload_param: params["include"] = self.sideload_param return params @@ -441,8 +462,59 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield event -class Users(SourceZendeskSupportStream): - """Users stream: https://developer.zendesk.com/api-reference/ticketing/users/users/""" +class SourceZendeskUserExportStream(SourceZendeskSupportCursorPaginationStream): + """Incremental Export from User stream: + https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-user-export + + @ param response_list_name: the main nested entity to look at inside of response, defualt = response_list_name + @ param sideload_param : parameter variable to include various information to response + more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints + """ + + cursor_field = "updated_at" + response_list_name: str = "users" + sideload_param: str = None + + @staticmethod + def check_start_time_param(requested_start_time: int, value: int = 1): + """ + Requesting users in the future is not allowed, hits 400 - bad request. + We get current UNIX timestamp minus `value` from now(), default = 1 (minute). + + Returns: either close to now UNIX timestamp or previously requested UNIX timestamp. + """ + now = calendar.timegm(pendulum.now().subtract( + minutes=value).utctimetuple()) + return now if requested_start_time > now else requested_start_time + + def path(self, **kwargs) -> str: + return f"incremental/{self.response_list_name}.json" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + Returns next_page_token based on `end_of_stream` parameter inside of response + """ + next_page_token = super().next_page_token(response) + return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token + + def request_params( + self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state, next_page_token, **kwargs) + # check "start_time" is not in the future + params["start_time"] = self.check_start_time_param( + params["start_time"]) + if self.sideload_param: + params["include"] = self.sideload_param + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + for record in response.json().get(self.response_list_name, []): + yield record + + +class Users(SourceZendeskUserExportStream): + """Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-user-export""" class Organizations(SourceZendeskSupportStream): @@ -483,8 +555,10 @@ def request_params( self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs ) -> MutableMapping[str, Any]: """Adds the filtering field 'start_time'""" - params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs) - start_time = self.str2unixtime((stream_state or {}).get(self.cursor_field)) + params = super().request_params(stream_state=stream_state, + next_page_token=next_page_token, **kwargs) + start_time = self.str2unixtime( + (stream_state or {}).get(self.cursor_field)) if not start_time: start_time = self.str2unixtime(self._start_date) @@ -535,7 +609,8 @@ class TicketAudits(SourceZendeskSupportCursorPaginationStream): # This endpoint uses a variant of cursor pagination with some differences from cursor pagination used in other endpoints. def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: - params = {"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size} + params = {"sort_by": self.cursor_field, + "sort_order": "desc", "limit": self.page_size} if next_page_token: params["cursor"] = next_page_token From abab5528e90da808d262b1d10ea318e06ebfb48e Mon Sep 17 00:00:00 2001 From: danieldiamond Date: Tue, 19 Apr 2022 12:50:23 +1000 Subject: [PATCH 2/9] Format streams --- .../source_zendesk_support/streams.py | 78 +++++++------------ 1 file changed, 26 insertions(+), 52 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 91a1192481a6..fa337b96e787 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -177,8 +177,7 @@ def get_api_records_count(self, stream_slice: Mapping[str, Any] = None, stream_s to then correctly generate the pagination parameters. """ - count_url = urljoin( - self.url_base, f"{self.path(stream_state=stream_state, stream_slice=stream_slice)}/count.json") + count_url = urljoin(self.url_base, f"{self.path(stream_state=stream_state, stream_slice=stream_slice)}/count.json") start_date = self._start_date params = {} @@ -199,31 +198,23 @@ def generate_future_requests( stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, ): - records_count = self.get_api_records_count( - stream_slice=stream_slice, stream_state=stream_state) + records_count = self.get_api_records_count(stream_slice=stream_slice, stream_state=stream_state) page_count = ceil(records_count / self.page_size) for page_number in range(1, page_count + 1): - params = self.request_params( - stream_state=stream_state, stream_slice=stream_slice) + params = self.request_params(stream_state=stream_state, stream_slice=stream_slice) params["page"] = page_number - request_headers = self.request_headers( - stream_state=stream_state, stream_slice=stream_slice) + request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice) request = self._create_prepared_request( - path=self.path(stream_state=stream_state, - stream_slice=stream_slice), - headers=dict(request_headers, ** - self.authenticator.get_auth_header()), + path=self.path(stream_state=stream_state, stream_slice=stream_slice), + headers=dict(request_headers, **self.authenticator.get_auth_header()), params=params, - json=self.request_body_json( - stream_state=stream_state, stream_slice=stream_slice), - data=self.request_body_data( - stream_state=stream_state, stream_slice=stream_slice), + json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice), + data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice), ) - request_kwargs = self.request_kwargs( - stream_state=stream_state, stream_slice=stream_slice) + request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice) self.future_requests.append( { "future": self._send_request(request, request_kwargs), @@ -235,8 +226,7 @@ def generate_future_requests( ) def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: - response: requests.Response = self._session.send_future( - request, **request_kwargs) + response: requests.Response = self._session.send_future(request, **request_kwargs) return response def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: @@ -251,8 +241,7 @@ def request_params( current_state = stream_state.get(self.cursor_field) if current_state and isinstance(current_state, str) and not current_state.isdigit(): current_state = self.str2unixtime(current_state) - start_time = current_state or calendar.timegm( - pendulum.parse(self._start_date).utctimetuple()) + start_time = current_state or calendar.timegm(pendulum.parse(self._start_date).utctimetuple()) # +1 because the API returns all records where generated_timestamp >= start_time now = calendar.timegm(datetime.now().utctimetuple()) @@ -270,8 +259,7 @@ def read_records( stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: - self.generate_future_requests( - sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) + self.generate_future_requests(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) while len(self.future_requests) > 0: item = self.future_requests.popleft() @@ -281,12 +269,10 @@ def read_records( if self.should_retry(response): backoff_time = self.backoff_time(response) if item["retries"] == self.max_retries: - raise DefaultBackoffException( - request=item["request"], response=response) + raise DefaultBackoffException(request=item["request"], response=response) else: if response.elapsed.total_seconds() < backoff_time: - time.sleep(backoff_time - - response.elapsed.total_seconds()) + time.sleep(backoff_time - response.elapsed.total_seconds()) self.future_requests.append( { @@ -346,14 +332,12 @@ class SourceZendeskSupportCursorPaginationStream(SourceZendeskSupportFullRefresh def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: # try to save maximum value of a cursor field - old_value = str((current_stream_state or {} - ).get(self.cursor_field, "")) + old_value = str((current_stream_state or {}).get(self.cursor_field, "")) new_value = str((latest_record or {}).get(self.cursor_field, "")) return {self.cursor_field: max(new_value, old_value)} def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - start_time = dict(parse_qsl(urlparse(response.json().get( - self.next_page_field), "").query)).get("start_time") + start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time") if start_time != self.prev_start_time: self.prev_start_time = start_time return {self.cursor_field: int(start_time)} @@ -362,8 +346,7 @@ def check_stream_state(self, stream_state: Mapping[str, Any] = None): """ Returns the state value, if exists. Otherwise, returns user defined `Start Date`. """ - state = stream_state.get( - self.cursor_field) or self._start_date if stream_state else self._start_date + state = stream_state.get(self.cursor_field) or self._start_date if stream_state else self._start_date return calendar.timegm(pendulum.parse(state).utctimetuple()) def request_params( @@ -372,11 +355,9 @@ def request_params( next_page_token = next_page_token or {} parsed_state = self.check_stream_state(stream_state) if self.cursor_field: - params = {"start_time": next_page_token.get( - self.cursor_field, parsed_state)} + params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)} else: - params = {"start_time": calendar.timegm( - pendulum.parse(self._start_date).utctimetuple())} + params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())} return params @@ -401,8 +382,7 @@ def check_start_time_param(requested_start_time: int, value: int = 1): Returns: either close to now UNIX timestamp or previously requested UNIX timestamp. """ - now = calendar.timegm(pendulum.now().subtract( - minutes=value).utctimetuple()) + now = calendar.timegm(pendulum.now().subtract(minutes=value).utctimetuple()) return now if requested_start_time > now else requested_start_time def path(self, **kwargs) -> str: @@ -420,8 +400,7 @@ def request_params( ) -> MutableMapping[str, Any]: params = super().request_params(stream_state, next_page_token, **kwargs) # check "start_time" is not in the future - params["start_time"] = self.check_start_time_param( - params["start_time"]) + params["start_time"] = self.check_start_time_param(params["start_time"]) if self.sideload_param: params["include"] = self.sideload_param return params @@ -483,8 +462,7 @@ def check_start_time_param(requested_start_time: int, value: int = 1): Returns: either close to now UNIX timestamp or previously requested UNIX timestamp. """ - now = calendar.timegm(pendulum.now().subtract( - minutes=value).utctimetuple()) + now = calendar.timegm(pendulum.now().subtract(minutes=value).utctimetuple()) return now if requested_start_time > now else requested_start_time def path(self, **kwargs) -> str: @@ -502,8 +480,7 @@ def request_params( ) -> MutableMapping[str, Any]: params = super().request_params(stream_state, next_page_token, **kwargs) # check "start_time" is not in the future - params["start_time"] = self.check_start_time_param( - params["start_time"]) + params["start_time"] = self.check_start_time_param(params["start_time"]) if self.sideload_param: params["include"] = self.sideload_param return params @@ -555,10 +532,8 @@ def request_params( self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs ) -> MutableMapping[str, Any]: """Adds the filtering field 'start_time'""" - params = super().request_params(stream_state=stream_state, - next_page_token=next_page_token, **kwargs) - start_time = self.str2unixtime( - (stream_state or {}).get(self.cursor_field)) + params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs) + start_time = self.str2unixtime((stream_state or {}).get(self.cursor_field)) if not start_time: start_time = self.str2unixtime(self._start_date) @@ -609,8 +584,7 @@ class TicketAudits(SourceZendeskSupportCursorPaginationStream): # This endpoint uses a variant of cursor pagination with some differences from cursor pagination used in other endpoints. def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: - params = {"sort_by": self.cursor_field, - "sort_order": "desc", "limit": self.page_size} + params = {"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size} if next_page_token: params["cursor"] = next_page_token From c41737bfd96a73532be2537cc6b7625d2963c00c Mon Sep 17 00:00:00 2001 From: danieldiamond Date: Tue, 19 Apr 2022 13:20:35 +1000 Subject: [PATCH 3/9] Update zendesk-support README --- docs/integrations/sources/zendesk-support.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/integrations/sources/zendesk-support.md b/docs/integrations/sources/zendesk-support.md index a4219d945e6a..6bcacc4cb2c3 100644 --- a/docs/integrations/sources/zendesk-support.md +++ b/docs/integrations/sources/zendesk-support.md @@ -27,7 +27,7 @@ This Source is capable of syncing the following core Streams: * [Ticket Forms](https://developer.zendesk.com/rest_api/docs/support/ticket_forms) * [Ticket Metrics](https://developer.zendesk.com/rest_api/docs/support/ticket_metrics) * [Ticket Metric Events](https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metric_events/) -* [Users](https://developer.zendesk.com/rest_api/docs/support/users) +* [Users](https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-user-export) ### Data type mapping @@ -62,7 +62,7 @@ The Zendesk connector should not run into Zendesk API limitations under normal u ### Requirements -* `Subdomain` - this is your Zendesk subdomain that can be found in your account URL. For example, in `https://{MY_SUBDOMAIN}.zendesk.com/`, where `MY_SUBDOMAIN` is the value of your subdomain. +* `Subdomain` - this is your Zendesk subdomain that can be found in your account URL. For example, in `https://{MY_SUBDOMAIN}.zendesk.com/`, where `MY_SUBDOMAIN` is the value of your subdomain. * `Authentication` - Zendesk service provides two authentication methods. Choose between: `OAuth2.0` or `API token`. * Authentication using `OAuth2.0` (Only for Airbyte Cloud) - obtain `access_token` by authorising using your Zendesk Account credentials. Simply proceed by pressing "Authenticate your Zendesk Account" and complete the authentication. * Authentication using `API Token`: @@ -74,6 +74,7 @@ The Zendesk connector should not run into Zendesk API limitations under normal u | Version | Date | Pull Request | Subject | |:---------|:-----------| :----- |:-------------------------------------------------------| +| `0.2.6` | 2022-04-19 | [12122](https://github.com/airbytehq/airbyte/pull/12122) | Fixed the bug when only 100,000 Users are synced | `0.2.5` | 2022-04-05 | [11727](https://github.com/airbytehq/airbyte/pull/11727) | Fixed the bug when state was not parsed correctly | `0.2.4` | 2022-04-04 | [11688](https://github.com/airbytehq/airbyte/pull/11688) | Small documentation corrections | `0.2.3` | 2022-03-23 | [11349](https://github.com/airbytehq/airbyte/pull/11349) | Fixed the bug when Tickets stream didn't return deleted records From a7217c398eaf14aba718387e4b09f3e1ecdeb9bb Mon Sep 17 00:00:00 2001 From: danieldiamond Date: Tue, 19 Apr 2022 13:40:58 +1000 Subject: [PATCH 4/9] Update dockerfile --- .../connectors/source-zendesk-support/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile index 69cb8c312ea3..72bc2ab36ba5 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile +++ b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.5 +LABEL io.airbyte.version=0.2.6 LABEL io.airbyte.name=airbyte/source-zendesk-support From 88e337742e6bd2f3f58f5160f818402c6791be48 Mon Sep 17 00:00:00 2001 From: danieldiamond Date: Tue, 19 Apr 2022 15:52:52 +1000 Subject: [PATCH 5/9] Update changelog to reflect additional fixes --- docs/integrations/sources/zendesk-support.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/zendesk-support.md b/docs/integrations/sources/zendesk-support.md index 6bcacc4cb2c3..1376d6c0d07f 100644 --- a/docs/integrations/sources/zendesk-support.md +++ b/docs/integrations/sources/zendesk-support.md @@ -74,7 +74,7 @@ The Zendesk connector should not run into Zendesk API limitations under normal u | Version | Date | Pull Request | Subject | |:---------|:-----------| :----- |:-------------------------------------------------------| -| `0.2.6` | 2022-04-19 | [12122](https://github.com/airbytehq/airbyte/pull/12122) | Fixed the bug when only 100,000 Users are synced +| `0.2.6` | 2022-04-19 | [12122](https://github.com/airbytehq/airbyte/pull/12122) | Fixed the bug when only 100,000 Users are synced [11895](https://github.com/airbytehq/airbyte/issues/11895) and fixed bug when `start_date` is not used on user stream [12059](https://github.com/airbytehq/airbyte/issues/12059). | `0.2.5` | 2022-04-05 | [11727](https://github.com/airbytehq/airbyte/pull/11727) | Fixed the bug when state was not parsed correctly | `0.2.4` | 2022-04-04 | [11688](https://github.com/airbytehq/airbyte/pull/11688) | Small documentation corrections | `0.2.3` | 2022-03-23 | [11349](https://github.com/airbytehq/airbyte/pull/11349) | Fixed the bug when Tickets stream didn't return deleted records From 73bb97d0e4d34da4cb806c55fcccff10653a921c Mon Sep 17 00:00:00 2001 From: Baz Date: Tue, 19 Apr 2022 19:53:38 +0300 Subject: [PATCH 6/9] Review updates --- .../source_zendesk_support/streams.py | 63 +++---------------- 1 file changed, 8 insertions(+), 55 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index fa337b96e787..4b82aad6959d 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -361,17 +361,17 @@ def request_params( return params -class SourceZendeskTicketExportStream(SourceZendeskSupportCursorPaginationStream): +class SourceZendeskIncrementalExportStream(SourceZendeskSupportCursorPaginationStream): """Incremental Export from Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based - @ param response_list_name: the main nested entity to look at inside of response, defualt = response_list_name + @ param response_list_name: the main nested entity to look at inside of response, default = response_list_name @ param sideload_param : parameter variable to include various information to response more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints """ cursor_field = "updated_at" - response_list_name: str = "tickets" + response_list_name: str = None sideload_param: str = None @staticmethod @@ -410,11 +410,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield record -class SourceZendeskSupportTicketEventsExportStream(SourceZendeskTicketExportStream): +class SourceZendeskSupportTicketEventsExportStream(SourceZendeskIncrementalExportStream): """Incremental Export from TicketEvents stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export - @ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events" + @ param response_list_name: the main nested entity to look at inside of response, default = "ticket_events" @ param response_target_entity: nested property inside of `response_list_name`, default = "child_events" @ param list_entities_from_event : the list of nested child_events entities to include from parent record @ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc] @@ -441,58 +441,10 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield event -class SourceZendeskUserExportStream(SourceZendeskSupportCursorPaginationStream): - """Incremental Export from User stream: - https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-user-export - - @ param response_list_name: the main nested entity to look at inside of response, defualt = response_list_name - @ param sideload_param : parameter variable to include various information to response - more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints - """ - - cursor_field = "updated_at" - response_list_name: str = "users" - sideload_param: str = None - - @staticmethod - def check_start_time_param(requested_start_time: int, value: int = 1): - """ - Requesting users in the future is not allowed, hits 400 - bad request. - We get current UNIX timestamp minus `value` from now(), default = 1 (minute). - - Returns: either close to now UNIX timestamp or previously requested UNIX timestamp. - """ - now = calendar.timegm(pendulum.now().subtract(minutes=value).utctimetuple()) - return now if requested_start_time > now else requested_start_time - - def path(self, **kwargs) -> str: - return f"incremental/{self.response_list_name}.json" - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - """ - Returns next_page_token based on `end_of_stream` parameter inside of response - """ - next_page_token = super().next_page_token(response) - return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token - - def request_params( - self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs - ) -> MutableMapping[str, Any]: - params = super().request_params(stream_state, next_page_token, **kwargs) - # check "start_time" is not in the future - params["start_time"] = self.check_start_time_param(params["start_time"]) - if self.sideload_param: - params["include"] = self.sideload_param - return params - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - for record in response.json().get(self.response_list_name, []): - yield record - - -class Users(SourceZendeskUserExportStream): +class Users(SourceZendeskIncrementalExportStream): """Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-user-export""" + response_list_name: str = "users" class Organizations(SourceZendeskSupportStream): """Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" @@ -501,6 +453,7 @@ class Organizations(SourceZendeskSupportStream): class Tickets(SourceZendeskTicketExportStream): """Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based""" + response_list_name: str = "tickets" class TicketComments(SourceZendeskSupportTicketEventsExportStream): """ From 97e548b2de69a53b9bac055ed00228f4f1e961ef Mon Sep 17 00:00:00 2001 From: danieldiamond Date: Wed, 20 Apr 2022 09:04:14 +1000 Subject: [PATCH 7/9] Fix typo from previous contribution --- .../source-zendesk-support/source_zendesk_support/streams.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 4b82aad6959d..3afd4e34356e 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -446,15 +446,17 @@ class Users(SourceZendeskIncrementalExportStream): response_list_name: str = "users" + class Organizations(SourceZendeskSupportStream): """Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" -class Tickets(SourceZendeskTicketExportStream): +class Tickets(SourceZendeskIncrementalExportStream): """Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based""" response_list_name: str = "tickets" + class TicketComments(SourceZendeskSupportTicketEventsExportStream): """ Fetch the TicketComments incrementaly from TicketEvents Export stream From 9efe7c48e47543eacbcf3cd2fc3c471bfeda920e Mon Sep 17 00:00:00 2001 From: danieldiamond Date: Wed, 20 Apr 2022 10:36:30 +1000 Subject: [PATCH 8/9] Update unit tests --- .../connectors/source-zendesk-support/unit_tests/unit_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py index 013feab42103..20847fb78f30 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py @@ -15,7 +15,7 @@ DATETIME_FORMAT, END_OF_STREAM_KEY, BaseSourceZendeskSupportStream, - SourceZendeskTicketExportStream, + SourceZendeskIncrementalExportStream, TicketComments, Tickets, ) @@ -90,7 +90,7 @@ def test_str2unixtime(): def test_check_start_time_param(): expected = 1626936955 start_time = calendar.timegm(pendulum.parse(DATETIME_STR).utctimetuple()) - output = SourceZendeskTicketExportStream.check_start_time_param(start_time) + output = SourceZendeskIncrementalExportStream.check_start_time_param(start_time) assert output == expected From 6e65ec381695b9545ff4904f63b4e2bec082c2ef Mon Sep 17 00:00:00 2001 From: danieldiamond Date: Wed, 20 Apr 2022 13:24:51 +1000 Subject: [PATCH 9/9] Remove enum and default from spec --- .../source-zendesk-support/source_zendesk_support/spec.json | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json index 9d37e224c9e2..ede42b469472 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json @@ -33,8 +33,6 @@ "credentials": { "type": "string", "const": "oauth2.0", - "enum": ["oauth2.0"], - "default": "oauth2.0", "order": 0 }, "access_token": { @@ -54,8 +52,6 @@ "credentials": { "type": "string", "const": "api_token", - "enum": ["api_token"], - "default": "api_token", "order": 0 }, "email": {