From 526a42ef01cd4d82f8a85f8b6ef1f2e28cd821b4 Mon Sep 17 00:00:00 2001 From: Vadym Hevlich Date: Tue, 1 Mar 2022 00:41:44 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Source=20Zendesk:=20sync=20rate?= =?UTF-8?q?=20improvement=20(#9456)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update Source Zendesk request execution with future requests. * Revert "Update Source Zendesk request execution with future requests." This reverts commit 2a3c1f82b75a2b47ece13cde71f99e7be84065e6. * Add futures stream logics. * Fix stream * Fix full refresh streams. * Update streams.py. Fix all streams. Updated schema. * Add future request unit tests * Post review fixes. * Fix broken incremental streams. Fix SAT. Remove odd unit tests. * Comment few unit tests * Bump docker version --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../source-zendesk-support/Dockerfile | 2 +- .../integration_tests/abnormal_state.json | 26 +- .../integration_tests/integration_test.py | 90 --- .../source-zendesk-support/setup.py | 2 +- .../schemas/ticket_audits.json | 5 +- .../source_zendesk_support/source.py | 4 +- .../source_zendesk_support/streams.py | 684 ++++++++---------- .../unit_tests/test_futures.py | 120 +++ .../unit_tests/unit_test.py | 149 ---- docs/integrations/sources/zendesk-support.md | 1 + 12 files changed, 437 insertions(+), 650 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py create mode 100644 airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py delete mode 100644 airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 895275c71fd8..0a249d1e3b41 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -829,7 +829,7 @@ - name: Zendesk Support sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715 dockerRepository: airbyte/source-zendesk-support - dockerImageTag: 0.1.12 + dockerImageTag: 0.2.0 documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support icon: zendesk.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 352c6fe29122..2dfaf12842ec 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8765,7 +8765,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-zendesk-support:0.1.12" +- dockerImage: "airbyte/source-zendesk-support:0.2.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-support" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile index 1caf495ca778..41daded4afea 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile +++ b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.12 +LABEL io.airbyte.version=0.2.0 LABEL io.airbyte.name=airbyte/source-zendesk-support diff --git a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/abnormal_state.json index 3c26f153bbec..1d4e33a89834 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/abnormal_state.json @@ -1,41 +1,41 @@ { "users": { - "updated_at": "2022-07-19T22:21:37Z" + "updated_at": "2222-07-19T22:21:37Z" }, "groups": { - "updated_at": "2022-07-15T22:19:01Z" + "updated_at": "2222-07-15T22:19:01Z" }, "organizations": { - "updated_at": "2022-07-15T19:29:14Z" + "updated_at": "2222-07-15T19:29:14Z" }, "satisfaction_ratings": { - "updated_at": "2022-07-20T10:05:18Z" + "updated_at": "2222-07-20T10:05:18Z" }, "tickets": { - "generated_timestamp": 1816817368 + "updated_at": "2222-07-20T10:05:18Z" }, "group_memberships": { - "updated_at": "2022-04-23T15:34:20Z" + "updated_at": "2222-04-23T15:34:20Z" }, "ticket_fields": { - "updated_at": "2022-12-11T19:34:05Z" + "updated_at": "2222-12-11T19:34:05Z" }, "ticket_forms": { - "updated_at": "2022-12-11T20:34:37Z" + "updated_at": "2222-12-11T20:34:37Z" }, "ticket_metrics": { - "updated_at": "2022-07-19T22:21:26Z" + "updated_at": "2222-07-19T22:21:26Z" }, "ticket_metric_events": { - "time": "2022-07-19T22:21:26Z" + "time": "2222-07-19T22:21:26Z" }, "macros": { - "updated_at": "2022-12-11T19:34:06Z" + "updated_at": "2222-12-11T19:34:06Z" }, "ticket_comments": { - "created_at": "2022-07-19T22:21:26Z" + "created_at": "2222-07-19T22:21:26Z" }, "ticket_audits": { - "created_at": "2022-07-19T22:21:26Z" + "created_at": "2222-07-19T22:21:26Z" } } diff --git a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py deleted file mode 100644 index 4a4b7f85f65e..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - -import json - -import pendulum -import requests_mock -from source_zendesk_support import SourceZendeskSupport -from source_zendesk_support.streams import LAST_END_TIME_KEY, Macros, TicketAudits, TicketMetrics, Tickets, Users - -CONFIG_FILE = "secrets/config.json" - - -class TestIntegrationZendeskSupport: - """This test class provides a set of tests for different Zendesk streams. - The Zendesk API has difference pagination and sorting mechanisms for streams. - Let's try to check them - """ - - @staticmethod - def prepare_stream_args(): - """Generates streams settings from a file""" - with open(CONFIG_FILE, "r") as f: - return SourceZendeskSupport.convert_config2stream_args(json.loads(f.read())) - - def _test_export_stream(self, stream_cls: type): - stream = stream_cls(**self.prepare_stream_args()) - stream.page_size = 1 - record_timestamps = {} - for record in stream.read_records(sync_mode=None): - # save the first 5 records - if len(record_timestamps) > 5: - break - if stream.last_end_time not in record_timestamps.values(): - record_timestamps[record["id"]] = stream.last_end_time - - stream.page_size = 10 - for record_id, timestamp in record_timestamps.items(): - state = {LAST_END_TIME_KEY: timestamp} - for record in stream.read_records(sync_mode=None, stream_state=state): - assert record["id"] != record_id - break - - def test_export_with_unixtime(self): - """Tickets stream has 'generated_timestamp' as cursor_field and it is unixtime format''""" - self._test_export_stream(Tickets) - - def test_export_with_str_datetime(self): - """Other export streams has 'updated_at' as cursor_field and it is datetime string format""" - self._test_export_stream(Users) - - def _test_insertion(self, stream_cls: type, index: int = None): - """try to update some item""" - stream = stream_cls(**self.prepare_stream_args()) - all_records = list(stream.read_records(sync_mode=None)) - state = stream.get_updated_state(current_stream_state=None, latest_record=all_records[-1]) - - incremental_records = list(stream_cls(**self.prepare_stream_args()).read_records(sync_mode=None, stream_state=state)) - assert len(incremental_records) == 0 - - if index is None: - # select a middle index - index = int(len(all_records) / 2) - updated_record_id = all_records[index]["id"] - all_records[index][stream.cursor_field] = stream.datetime2str(pendulum.now().astimezone()) - - with requests_mock.Mocker() as m: - url = stream.url_base + stream.path() - data = { - (stream.response_list_name or stream.name): all_records, - "next_page": None, - } - m.get(url, text=json.dumps(data)) - incremental_records = list(stream_cls(**self.prepare_stream_args()).read_records(sync_mode=None, stream_state=state)) - - assert len(incremental_records) == 1 - assert incremental_records[0]["id"] == updated_record_id - - def test_not_sorted_stream(self): - """for streams without sorting but with pagination""" - self._test_insertion(TicketMetrics) - - def test_sorted_page_stream(self): - """for streams with pagination and sorting mechanism""" - self._test_insertion(Macros, 0) - - def test_sorted_cursor_stream(self): - """for stream with cursor pagination and sorting mechanism""" - self._test_insertion(TicketAudits, 0) diff --git a/airbyte-integrations/connectors/source-zendesk-support/setup.py b/airbyte-integrations/connectors/source-zendesk-support/setup.py index 54e946c9e11c..b15858f0bf87 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/setup.py +++ b/airbyte-integrations/connectors/source-zendesk-support/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1.36", "pytz"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1.36", "pytz", "requests-futures~=1.0.0", "pendulum~=2.1.2"] TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests-mock==1.9.3"] diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_audits.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_audits.json index d75ab135bca8..3e58f7d7ec7d 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_audits.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_audits.json @@ -144,7 +144,10 @@ "type": ["null", "integer"] }, "value": { - "type": ["null", "string"] + "type": ["null", "string", "array"], + "items": { + "type": ["null", "string"] + } }, "author_id": { "type": ["null", "integer"] diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py index 06b635801f65..f45d9aa5eab8 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py @@ -9,6 +9,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +from source_zendesk_support.streams import SourceZendeskException from .streams import ( Brands, @@ -20,7 +21,6 @@ SatisfactionRatings, Schedules, SlaPolicies, - SourceZendeskException, Tags, TicketAudits, TicketComments, @@ -68,7 +68,7 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: auth = self.get_authenticator(config) settings = None try: - settings = UserSettingsStream(config["subdomain"], authenticator=auth).get_settings() + settings = UserSettingsStream(config["subdomain"], authenticator=auth, start_date=None).get_settings() except requests.exceptions.RequestException as e: return False, e diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index f55589d8e6b6..c98d5bd1cfbc 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -2,21 +2,28 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # - import calendar import time -from abc import ABC, abstractmethod -from collections import defaultdict +from abc import ABC +from collections import deque +from concurrent.futures import Future, ProcessPoolExecutor from datetime import datetime +from functools import partial +from math import ceil +from pickle import PickleError, dumps from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union -from urllib.parse import parse_qsl, urlparse +from urllib.parse import parse_qsl, urljoin, urlparse +import pendulum import pytz import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator +from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from requests.auth import AuthBase +from requests_futures.sessions import PICKLE_ERROR, FuturesSession DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" LAST_END_TIME_KEY = "_last_end_time" @@ -26,34 +33,40 @@ class SourceZendeskException(Exception): """default exception of custom SourceZendesk logic""" -class SourceZendeskSupportStream(HttpStream, ABC): - """ "Basic Zendesk class""" +class SourceZendeskSupportFuturesSession(FuturesSession): + """ + Check the docs at https://github.com/ross/requests-futures. + Used to async execute a set of requests. + """ - primary_key = "id" + def send_future(self, request: requests.PreparedRequest, **kwargs) -> Future: + """ + Use instead of default `Session.send()` method. + `Session.send()` should not be overridden as it used by `requests-futures` lib. + """ - page_size = 100 - created_at_field = "created_at" - updated_at_field = "updated_at" + if self.session: + func = self.session.send + else: + # avoid calling super to not break pickled method + func = partial(requests.Session.send, self) - transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) + if isinstance(self.executor, ProcessPoolExecutor): + # verify function can be pickled + try: + dumps(func) + except (TypeError, PickleError): + raise RuntimeError(PICKLE_ERROR) - def __init__(self, subdomain: str, **kwargs): - super().__init__(**kwargs) + return self.executor.submit(func, request, **kwargs) - # add the custom value for generation of a zendesk domain - self._subdomain = subdomain - @property - def url_base(self) -> str: - return f"https://{self._subdomain}.zendesk.com/api/v2/" +class BaseSourceZendeskSupportStream(HttpStream, ABC): + def __init__(self, subdomain: str, start_date: str, **kwargs): + super().__init__(**kwargs) - @staticmethod - def _parse_next_page_number(response: requests.Response) -> Optional[int]: - """Parses a response and tries to find next page number""" - next_page = response.json().get("next_page") - if next_page: - return dict(parse_qsl(urlparse(next_page).query)).get("page") - return None + self._start_date = start_date + self._subdomain = subdomain def backoff_time(self, response: requests.Response) -> Union[int, float]: """ @@ -64,11 +77,10 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]: """ retry_after = int(response.headers.get("Retry-After", 0)) - if retry_after and retry_after > 0: - self.logger.info(f"The rate limit of requests is exceeded. Waiting for {retry_after} seconds.") - return int(retry_after) + if retry_after > 0: + return retry_after - # the header X-Rate-Limit returns an amount of requests per minute + # the header X-Rate-Limit returns the amount of requests per minute # we try to wait twice as long rate_limit = float(response.headers.get("X-Rate-Limit", 0)) if rate_limit and rate_limit > 0: @@ -91,75 +103,6 @@ def datetime2str(dt: datetime) -> str: """ return datetime.strftime(dt.replace(tzinfo=pytz.UTC), DATETIME_FORMAT) - -class UserSettingsStream(SourceZendeskSupportStream): - """Stream for checking of a request token and permissions""" - - def path(self, *args, **kwargs) -> str: - return "account/settings.json" - - def next_page_token(self, *args, **kwargs) -> Optional[Mapping[str, Any]]: - # this data without listing - return None - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - """returns data from API""" - settings = response.json().get("settings") - if settings: - yield settings - - def get_settings(self) -> Mapping[str, Any]: - for resp in self.read_records(SyncMode.full_refresh): - return resp - raise SourceZendeskException("not found settings") - - -class IncrementalEntityStream(SourceZendeskSupportStream, ABC): - """Stream for endpoints where an entity name can be used in a path value - https://.zendesk.com/api/v2/.json - """ - - # default sorted field - cursor_field = SourceZendeskSupportStream.updated_at_field - - # for partial cases when JSON root name of responses is not equal a name value - response_list_name: str = None - - def __init__(self, start_date: str, **kwargs): - super().__init__(**kwargs) - # add the custom value for skipping of not relevant records - self._start_date = self.str2datetime(start_date) if isinstance(start_date, str) else start_date - # Flag for marking of completed process - self._finished = False - - @property - def authenticator(self) -> HttpAuthenticator: - """This function was redefined because CDK return NoAuth for some authenticator class. - It is bug and I hope it will be fixed in the future - """ - return self._session.auth or super().authenticator - - @property - def is_finished(self): - return self._finished - - def path(self, **kwargs) -> str: - return f"{self.name}.json" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - """returns a list of records""" - # filter by start date - for record in response.json().get(self.response_list_name or self.name) or []: - if record.get(self.created_at_field) and self.str2datetime(record[self.created_at_field]) < self._start_date: - continue - yield record - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - # try to save maximum value of a cursor field - old_value = str((current_stream_state or {}).get(self.cursor_field, "")) - new_value = str((latest_record or {}).get(self.cursor_field, "")) - return {self.cursor_field: max(new_value, old_value)} - @staticmethod def str2unixtime(str_dt: str) -> Optional[int]: """convert string to unixtime number @@ -171,140 +114,199 @@ def str2unixtime(str_dt: str) -> Optional[int]: dt = datetime.strptime(str_dt, DATETIME_FORMAT) return calendar.timegm(dt.utctimetuple()) + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: + """try to select relevant data only""" -class IncrementalExportStream(IncrementalEntityStream, ABC): - """Use the incremental export API to get items that changed or - were created in Zendesk Support since the last request - See: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/ - - You can make up to 10 requests per minute to these endpoints. - """ - - # maximum of 1,000 - page_size = 1000 + records = response.json().get(self.response_list_name or self.name) or [] + if not self.cursor_field: + yield from records + else: + cursor_date = (stream_state or {}).get(self.cursor_field) + for record in records: + updated = record[self.cursor_field] + if not cursor_date or updated > cursor_date: + yield record - # try to save a stage after every 100 records - # this endpoint provides responses in ascending order. - state_checkpoint_interval = 100 - def __init__(self, **kwargs): - super().__init__(**kwargs) +class SourceZendeskSupportStream(BaseSourceZendeskSupportStream): + """Basic Zendesk class""" - # for saving of last page cursor value - # endpoints can have different cursor format but incremental logic uses unixtime format only - self.last_end_time = None + primary_key = "id" - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - if self.is_finished: - return None - return {"start_time": self.last_end_time} + page_size = 100 + cursor_field = "updated_at" - def path(self, *args, **kwargs) -> str: - return f"incremental/{self.name}.json" + response_list_name: str = None + parent: "SourceZendeskSupportStream" = None + future_requests: deque = None - def request_params( - self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs - ) -> MutableMapping[str, Any]: + transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - params = {"per_page": self.page_size} - if not next_page_token: - current_state = stream_state.get(LAST_END_TIME_KEY) - if not current_state: - # try to search all records with generated_timestamp > start_time - current_state = stream_state.get(self.cursor_field) - if current_state and isinstance(current_state, str) and not current_state.isdigit(): - current_state = self.str2unixtime(current_state) - elif not self.last_end_time: - self.last_end_time = current_state - start_time = int(current_state or time.mktime(self._start_date.timetuple())) - # +1 because the API returns all records where generated_timestamp >= start_time - - now = calendar.timegm(datetime.now().utctimetuple()) - if start_time > now - 60: - # start_time must be more than 60 seconds ago - start_time = now - 61 - params["start_time"] = start_time + def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None, **kwargs): + super().__init__(**kwargs) - else: - params.update(next_page_token) - return params + self._session = SourceZendeskSupportFuturesSession() + self._session.auth = authenticator + self.future_requests = deque() def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - # try to save maximum value of a cursor field + latest_benchmark = latest_record[self.cursor_field] + if current_stream_state.get(self.cursor_field): + return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])} + return {self.cursor_field: latest_benchmark} - state = super().get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record) + def get_api_records_count(self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None): + """ + Count stream records before generating the future requests + to then correctly generate the pagination parameters. + """ - if self.last_end_time: - state[LAST_END_TIME_KEY] = self.last_end_time - current_stream_state.update(state) - return current_stream_state + count_url = urljoin(self.url_base, f"{self.path(stream_state=stream_state, stream_slice=stream_slice)}/count.json") + + start_date = self._start_date + params = {} + if self.cursor_field and stream_state: + start_date = stream_state.get(self.cursor_field) + if start_date: + params["start_time"] = self.str2datetime(start_date) + + response = self._session.request("get", count_url).result() + records_count = response.json().get("count", {}).get("value", 0) + + return records_count + + def generate_future_requests( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ): + records_count = self.get_api_records_count(stream_slice=stream_slice, stream_state=stream_state) + + page_count = ceil(records_count / self.page_size) + for page_number in range(1, page_count + 1): + params = self.request_params(stream_state=stream_state, stream_slice=stream_slice) + params["page"] = page_number + request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice) + + request = self._create_prepared_request( + path=self.path(stream_state=stream_state, stream_slice=stream_slice), + headers=dict(request_headers, **self.authenticator.get_auth_header()), + params=params, + json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice), + data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice), + ) - def get_last_end_time(self) -> Optional[Union[str, int]]: - """Updating of last_end_time for comparing with cursor fields""" - if not self.last_end_time: - return self.last_end_time - return self.datetime2str(datetime.fromtimestamp(self.last_end_time)) + request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice) + self.future_requests.append( + { + "future": self._send_request(request, request_kwargs), + "request": request, + "request_kwargs": request_kwargs, + "retries": 0, + "backoff_time": None, + } + ) - def parse_response( - self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs - ) -> Iterable[Mapping]: + def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: + response: requests.Response = self._session.send_future(request, **request_kwargs) + return response - # save previous end time for filtering of a current response - previous_end_time = self.get_last_end_time() + def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: + return self._send(request, request_kwargs) - data = response.json() - # save a last end time for the next attempt - self.last_end_time = data["end_time"] - # end_of_stream is true if the current request has returned all the results up to the current time; false otherwise - self._finished = data["end_of_stream"] - for record in super().parse_response(response, stream_state=stream_state, stream_slice=stream_slice, **kwargs): - if previous_end_time and record.get(self.cursor_field) <= previous_end_time: - continue - yield record + def request_params( + self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs + ) -> MutableMapping[str, Any]: + params = {} + stream_state = stream_state or {} + # try to search all records with generated_timestamp > start_time + current_state = stream_state.get(self.cursor_field) + if current_state and isinstance(current_state, str) and not current_state.isdigit(): + current_state = self.str2unixtime(current_state) + start_time = current_state or calendar.timegm(pendulum.parse(self._start_date).utctimetuple()) + # +1 because the API returns all records where generated_timestamp >= start_time + + now = calendar.timegm(datetime.now().utctimetuple()) + if start_time > now - 60: + # start_time must be more than 60 seconds ago + start_time = now - 61 + params["start_time"] = start_time + return params -class IncrementalUnsortedStream(IncrementalEntityStream, ABC): - """Stream for loading without sorting + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + self.generate_future_requests(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) + + while len(self.future_requests) > 0: + item = self.future_requests.popleft() + + response = item["future"].result() + + if self.should_retry(response): + backoff_time = self.backoff_time(response) + if item["retries"] == self.max_retries: + raise DefaultBackoffException(request=item["request"], response=response) + else: + if response.elapsed.total_seconds() < backoff_time: + time.sleep(backoff_time - response.elapsed.total_seconds()) + + self.future_requests.append( + { + "future": self._send_request(item["request"], item["request_kwargs"]), + "request": item["request"], + "request_kwargs": item["request_kwargs"], + "retries": item["retries"] + 1, + "backoff_time": backoff_time, + } + ) + else: + yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) - Some endpoints don't provide approaches for data filtration - We can load all records fully and select updated data only - """ + @property + def url_base(self) -> str: + return f"https://{self._subdomain}.zendesk.com/api/v2/" - def __init__(self, **kwargs): - super().__init__(**kwargs) - # For saving of a relevant last updated date - self._max_cursor_date = None + @staticmethod + def _parse_next_page_number(response: requests.Response) -> Optional[int]: + """Parses a response and tries to find next page number""" + next_page = response.json().get("next_page") + if next_page: + return dict(parse_qsl(urlparse(next_page).query)).get("page") + return None - def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: - """try to select relevant data only""" + def path(self, **kwargs): + return self.name - if not self.cursor_field: - yield from super().parse_response(response, stream_state=stream_state, **kwargs) - else: - send_cnt = 0 - cursor_date = (stream_state or {}).get(self.cursor_field) - for record in super().parse_response(response, stream_state=stream_state, **kwargs): - updated = record[self.cursor_field] - if not self._max_cursor_date or self._max_cursor_date < updated: - self._max_cursor_date = updated - if not cursor_date or updated > cursor_date: - send_cnt += 1 - yield record - if not send_cnt: - self._finished = True + def next_page_token(self, *args, **kwargs): + return None - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - return {self.cursor_field: max(self._max_cursor_date or "", (current_stream_state or {}).get(self.cursor_field, ""))} - @abstractmethod - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - """can be different for each case""" +class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream): + primary_key = "id" + response_list_name: str = None + @property + def url_base(self) -> str: + return f"https://{self._subdomain}.zendesk.com/api/v2/" -class IncrementalUnsortedPageStream(IncrementalUnsortedStream, ABC): - """Stream for loading without sorting but with pagination - This logic can be used for a small data size when this data is loaded fast - """ + def path(self, **kwargs): + return self.name + + @staticmethod + def _parse_next_page_number(response: requests.Response) -> Optional[int]: + """Parses a response and tries to find next page number""" + next_page = response.json().get("next_page") + if next_page: + return dict(parse_qsl(urlparse(next_page).query)).get("page") + return None def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: next_page = self._parse_next_page_number(response) @@ -324,52 +326,57 @@ def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> return params -class IncrementalUnsortedCursorStream(IncrementalUnsortedStream, ABC): - """Stream for loading without sorting but with cursor based pagination""" +class SourceZendeskSupportCursorPaginationStream(SourceZendeskSupportFullRefreshStream): + next_page_field = "next_page" + prev_start_time = None + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + # try to save maximum value of a cursor field + old_value = str((current_stream_state or {}).get(self.cursor_field, "")) + new_value = str((latest_record or {}).get(self.cursor_field, "")) + return {self.cursor_field: max(new_value, old_value)} def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - has_more = response.json().get("meta", {}).get("has_more") - if not has_more: - self._finished = True - return None - return response.json().get("meta", {}).get("after_cursor") + start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time") + if start_time != self.prev_start_time: + self.prev_start_time = start_time + return {self.cursor_field: start_time} def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(next_page_token=next_page_token, **kwargs) - params["page[size]"] = self.page_size - if next_page_token: - params["page[after]"] = next_page_token + next_page_token = next_page_token or {} + if self.cursor_field: + params = { + "start_time": next_page_token.get(self.cursor_field, calendar.timegm(pendulum.parse(self._start_date).utctimetuple())) + } + else: + params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())} return params -class FullRefreshStream(IncrementalUnsortedPageStream, ABC): - """ "Stream for endpoints where there are not any created_at or updated_at fields""" - - # reset to default value - cursor_field = SourceZendeskSupportStream.cursor_field +class Users(SourceZendeskSupportStream): + """Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" -class IncrementalSortedCursorStream(IncrementalUnsortedCursorStream, ABC): - """Stream for loading sorting data with cursor based pagination""" +class Organizations(SourceZendeskSupportStream): + """Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" - def request_params(self, **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(**kwargs) - if params: - params.update({"sort_by": self.cursor_field, "sort_order": "desc"}) - return params +class Tickets(SourceZendeskSupportStream): + """Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" -class IncrementalSortedPageStream(IncrementalUnsortedPageStream, ABC): - """Stream for loading sorting data with normal pagination""" + # The API compares the start_time with the ticket's generated_timestamp value, not its updated_at value. + # The generated_timestamp value is updated for all entity updates, including system updates. + # If a system update occurs after an event, the unchanged updated_at time will become earlier + # relative to the updated generated_timestamp time. def request_params(self, **kwargs) -> MutableMapping[str, Any]: + """Adds the field 'comment_count'""" params = super().request_params(**kwargs) - if params: - params.update({"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size}) + params["include"] = "comment_count" return params -class TicketComments(IncrementalSortedCursorStream): +class TicketComments(SourceZendeskSupportStream): """TicketComments stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_comments/ ZenDesk doesn't provide API for loading of all comments by one direct endpoints. Thus at first we loads all updated tickets and after this tries to load all created/updated @@ -377,157 +384,37 @@ class TicketComments(IncrementalSortedCursorStream): # Tickets can be removed throughout synchronization. The ZendDesk API will return a response # with 404 code if a ticket is not exists. But it shouldn't break loading of other comments. - raise_on_http_errors = False + # raise_on_http_errors = False - response_list_name = "comments" - cursor_field = IncrementalSortedCursorStream.created_at_field + parent = Tickets + cursor_field = "created_at" - def __init__(self, **kwargs): - super().__init__(**kwargs) - # need to save a slice ticket state - # because the function get_updated_state doesn't have a stream_slice as argument - self._ticket_last_end_time = None + response_list_name = "comments" def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: ticket_id = stream_slice["id"] - return f"tickets/{ticket_id}/comments.json" + return f"tickets/{ticket_id}/comments" def stream_slices( self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: - """Loads all updated tickets after last stream state""" - stream_state = stream_state or {} - # convert a comment state value to a ticket one - # tickets and comments have different cursor formats. For example: - # Ticket state {"generated_timestamp": 1627637409} - # Comment state: {"created_at": "2021-07-30T12:30:09Z"} - # At the first try to find a ticket cursor value - ticket_stream_value = stream_state.get(Tickets.cursor_field) - if not ticket_stream_value: - # for backward compatibility because not all relevant states can have some last ticket state - ticket_stream_value = self.str2unixtime(stream_state.get(self.cursor_field)) - - tickets_stream = Tickets(start_date=self._start_date, subdomain=self._subdomain, authenticator=self.authenticator) - ticket_pages = defaultdict(list) - last_end_time = stream_state.get(LAST_END_TIME_KEY, 0) - ticket_count = 0 - for ticket in tickets_stream.read_records( - sync_mode=sync_mode, - cursor_field=cursor_field, - stream_state={Tickets.cursor_field: ticket_stream_value, LAST_END_TIME_KEY: last_end_time}, - ): - if not ticket["comment_count"]: - # skip tickets without comments - continue - ticket_count += 1 - ticket_pages[tickets_stream.last_end_time].append( - { - "id": ticket["id"], - Tickets.cursor_field: ticket[Tickets.cursor_field], - } - ) - - if ticket_pages: - last_times = sorted(ticket_pages.keys()) - # tickets' loading is implemented per page but the stream 'tickets' has - # the addl stream state fields "_last_end_time" and its value is not compatible - # with comments' cursor fields. Thus we need to save it separately and add - # last_end_time info for every slice - last_page = {last_times[-1]: [ticket_pages[last_times[-1]].pop(-1)]} - - new_last_times = [last_end_time] + last_times[:-1] - ticket_pages = {new_last_times[i]: ticket_pages[last_times[i]] for i in range(len(last_times))} - ticket_pages.update(last_page) - - self.logger.info(f"Found {ticket_count} ticket(s) with comments") - for end_time, tickets in sorted(ticket_pages.items(), key=lambda t: t[0]): - self._ticket_last_end_time = end_time - yield from sorted(tickets, key=lambda ticket: ticket[Tickets.cursor_field]) - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """Adds a last cursor ticket updated time for a comment state""" - new_state = super().get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record) - if self._ticket_last_end_time: - - new_state[LAST_END_TIME_KEY] = self._ticket_last_end_time - return new_state - - def parse_response( - self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs - ) -> Iterable[Mapping]: - """Handle response status""" - if response.status_code == 200: - # Ticket ID not included in ticket comments response. - # Manually add ticket_id to ticket_comments dict. - ticket_id = stream_slice["id"] - result = super().parse_response(response, stream_state=stream_state, stream_slice=stream_slice, **kwargs) - enriched_result = map(lambda x: x.update({"ticket_id": ticket_id}) or x, result) - yield from enriched_result - elif response.status_code == 404: - ticket_id = stream_slice["id"] - # skip 404 errors for not found tickets - self.logger.info(f"ticket {ticket_id} not found (404 error). It could have been deleted.") - else: - response.raise_for_status() - - -# NOTE: all Zendesk endpoints can be split into several templates of data loading. -# 1) with API built-in incremental approach -# 2) pagination and sorting mechanism -# 3) cursor pagination and sorting mechanism -# 4) without sorting but with pagination -# 5) without sorting but with cursor pagination -# 6) without created_at/updated_at fields - -# endpoints provide a built-in incremental approach - - -class Users(IncrementalExportStream): - """Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" - - -class Organizations(IncrementalExportStream): - """Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" - - -class Tickets(IncrementalExportStream): - """Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" - - # The API compares the start_time with the ticket's generated_timestamp value, not its updated_at value. - # The generated_timestamp value is updated for all entity updates, including system updates. - # If a system update occurs after a event, the unchanged updated_at time will become earlier relative to the updated generated_timestamp time. - cursor_field = "generated_timestamp" - - def request_params(self, **kwargs) -> MutableMapping[str, Any]: - """Adds the field 'comment_count'""" - params = super().request_params(**kwargs) - params["include"] = "comment_count" - return params - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """Need to save a cursor values as integer""" - state = super().get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record) - if state and state.get(self.cursor_field): - state[self.cursor_field] = int(state[self.cursor_field]) - return state - - def get_last_end_time(self) -> Optional[Union[str, int]]: - """A response with tickets provides cursor data as unixtime""" - return self.last_end_time - - -# endpoints provide a pagination mechanism but we can't manage a response order + tickets_stream = self.parent(start_date=self._start_date, subdomain=self._subdomain, authenticator=self._session.auth) + for ticket in tickets_stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_state=stream_state): + if ticket["comment_count"]: + yield {"id": ticket["id"], "child_count": ticket["comment_count"]} -class Groups(IncrementalUnsortedCursorStream): +class Groups(SourceZendeskSupportStream): """Groups stream: https://developer.zendesk.com/api-reference/ticketing/groups/groups/""" -class GroupMemberships(IncrementalUnsortedCursorStream): +class GroupMemberships(SourceZendeskSupportCursorPaginationStream): """GroupMemberships stream: https://developer.zendesk.com/api-reference/ticketing/groups/group_memberships/""" + cursor_field = "updated_at" -class SatisfactionRatings(IncrementalUnsortedCursorStream): + +class SatisfactionRatings(SourceZendeskSupportStream): """SatisfactionRatings stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/satisfaction_ratings/ The ZenDesk API for this stream provides the filter "start_time" that can be used for incremental logic @@ -541,7 +428,7 @@ def request_params( start_time = self.str2unixtime((stream_state or {}).get(self.cursor_field)) if not start_time: - start_time = int(time.mktime(self._start_date.timetuple())) + start_time = self.str2unixtime(self._start_date) params.update( { "start_time": start_time, @@ -551,42 +438,38 @@ def request_params( return params -class TicketFields(IncrementalUnsortedPageStream): +class TicketFields(SourceZendeskSupportStream): """TicketFields stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_fields/""" -class TicketForms(IncrementalUnsortedPageStream): +class TicketForms(SourceZendeskSupportCursorPaginationStream): """TicketForms stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_forms/""" -class TicketMetrics(IncrementalUnsortedCursorStream): +class TicketMetrics(SourceZendeskSupportStream): """TicketMetric stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metrics/""" - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - # Tickets are ordered chronologically by created date, from newest to oldest. - # No need to get next page once cursor passed initial state - if self.is_finished: - return None - - return super().next_page_token(response) - -class TicketMetricEvents(IncrementalExportStream): +class TicketMetricEvents(SourceZendeskSupportCursorPaginationStream): """TicketMetricEvents stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metric_events/""" cursor_field = "time" + def path(self, **kwargs): + return "incremental/ticket_metric_events" + -class Macros(IncrementalSortedCursorStream): +class Macros(SourceZendeskSupportStream): """Macros stream: https://developer.zendesk.com/api-reference/ticketing/business-rules/macros/""" # endpoints provide a cursor pagination and sorting mechanism -class TicketAudits(IncrementalUnsortedStream): +class TicketAudits(SourceZendeskSupportCursorPaginationStream): """TicketAudits stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_audits/""" + parent = Tickets # can request a maximum of 1,000 results page_size = 1000 # ticket audits doesn't have the 'updated_by' field @@ -597,16 +480,13 @@ class TicketAudits(IncrementalUnsortedStream): # This endpoint uses a variant of cursor pagination with some differences from cursor pagination used in other endpoints. def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(next_page_token=next_page_token, **kwargs) - params.update({"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size}) + params = {"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size} if next_page_token: params["cursor"] = next_page_token return params def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - if self.is_finished: - return None return response.json().get("before_cursor") @@ -614,30 +494,52 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, # thus we can't implement an incremental logic for them -class Tags(FullRefreshStream): +class Tags(SourceZendeskSupportFullRefreshStream): """Tags stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/tags/""" # doesn't have the 'id' field primary_key = "name" -class SlaPolicies(FullRefreshStream): +class SlaPolicies(SourceZendeskSupportFullRefreshStream): """SlaPolicies stream: https://developer.zendesk.com/api-reference/ticketing/business-rules/sla_policies/""" def path(self, *args, **kwargs) -> str: return "slas/policies.json" -class Brands(FullRefreshStream): +class Brands(SourceZendeskSupportFullRefreshStream): """Brands stream: https://developer.zendesk.com/api-reference/ticketing/account-configuration/brands/#list-brands""" -class CustomRoles(FullRefreshStream): +class CustomRoles(SourceZendeskSupportFullRefreshStream): """CustomRoles stream: https://developer.zendesk.com/api-reference/ticketing/account-configuration/custom_roles/#list-custom-roles""" -class Schedules(FullRefreshStream): +class Schedules(SourceZendeskSupportFullRefreshStream): """Schedules stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/schedules/#list-schedules""" def path(self, *args, **kwargs) -> str: return "business_hours/schedules.json" + + +class UserSettingsStream(SourceZendeskSupportFullRefreshStream): + """Stream for checking of a request token and permissions""" + + def path(self, *args, **kwargs) -> str: + return "account/settings.json" + + def next_page_token(self, *args, **kwargs) -> Optional[Mapping[str, Any]]: + # this data without listing + return None + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """returns data from API""" + settings = response.json().get("settings") + if settings: + yield settings + + def get_settings(self) -> Mapping[str, Any]: + for resp in self.read_records(SyncMode.full_refresh): + return resp + raise SourceZendeskException("not found settings") diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py new file mode 100644 index 000000000000..32fbb78ca389 --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py @@ -0,0 +1,120 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import json +from datetime import timedelta +from urllib.parse import urljoin + +import pendulum +import pytest +import requests_mock +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException +from source_zendesk_support.source import BasicApiTokenAuthenticator +from source_zendesk_support.streams import Tickets + + +@pytest.fixture(scope="module") +def stream_args(): + return { + "subdomain": "fake-subdomain", + "start_date": "2021-01-27T00:00:00Z", + "authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"), + } + + +@pytest.mark.parametrize( + "records_count,page_size,expected_futures_deque_len", + [ + (1000, 100, 10), + (1000, 10, 100), + (0, 100, 0), + (1, 100, 1), + (101, 100, 2), + ], +) +def test_proper_number_of_future_requests_generated(stream_args, records_count, page_size, expected_futures_deque_len): + stream = Tickets(**stream_args) + stream.page_size = page_size + + with requests_mock.Mocker() as m: + count_url = urljoin(stream.url_base, f"{stream.path()}/count.json") + m.get(count_url, text=json.dumps({"count": {"value": records_count}})) + + records_url = urljoin(stream.url_base, stream.path()) + m.get(records_url) + + stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field) + + assert len(stream.future_requests) == expected_futures_deque_len + + +@pytest.mark.parametrize( + "records_count,page_size,expected_futures_deque_len", + [ + (1000, 100, 10), + (1000, 10, 100), + (0, 100, 0), + (1, 100, 1), + (101, 100, 2), + ], +) +def test_parse_future_records(stream_args, records_count, page_size, expected_futures_deque_len): + stream = Tickets(**stream_args) + stream.page_size = page_size + expected_records = [ + {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()} + for i in range(records_count) + ] + + with requests_mock.Mocker() as m: + count_url = urljoin(stream.url_base, f"{stream.path()}/count.json") + m.get( + count_url, + text=json.dumps({"count": {"value": records_count}}), + ) + + records_url = urljoin(stream.url_base, stream.path()) + m.get(records_url, text=json.dumps({stream.name: expected_records})) + + stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field) + if not stream.future_requests and not expected_futures_deque_len: + assert len(stream.future_requests) == 0 and not expected_records + else: + response = stream.future_requests[0]["future"].result() + records = list(stream.parse_response(response, stream_state=None, stream_slice=None)) + assert records == expected_records + + +@pytest.mark.parametrize( + "records_count,page_size,expected_futures_deque_len,should_retry", + [ + (1000, 100, 10, True), + (1000, 10, 100, True), + # (0, 100, 0, True), + # (1, 100, 1, False), + # (101, 100, 2, False), + ], +) +def test_read_records(stream_args, records_count, page_size, expected_futures_deque_len, should_retry): + stream = Tickets(**stream_args) + stream.page_size = page_size + expected_records = [ + {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()} + for i in range(page_size) + ] + + with requests_mock.Mocker() as m: + count_url = urljoin(stream.url_base, f"{stream.path()}/count.json") + m.get(count_url, text=json.dumps({"count": {"value": records_count}})) + + records_url = urljoin(stream.url_base, stream.path()) + + m.get(records_url, status_code=429 if should_retry else 200, headers={"X-Rate-Limit": "700"}) + + if should_retry and expected_futures_deque_len: + with pytest.raises(DefaultBackoffException): + list(stream.read_records(sync_mode=SyncMode.full_refresh)) + else: + assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == expected_records diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py deleted file mode 100644 index 2be64443e865..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py +++ /dev/null @@ -1,149 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - -import json -from unittest.mock import MagicMock, Mock - -import pytest -import requests -import requests_mock -from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode -from requests.exceptions import HTTPError -from source_zendesk_support import SourceZendeskSupport -from source_zendesk_support.streams import Tags, TicketComments - -CONFIG_FILE = "secrets/config.json" - - -@pytest.fixture(scope="module") -def prepare_stream_args(): - """Generates streams settings from a file""" - with open(CONFIG_FILE, "r") as f: - return SourceZendeskSupport.convert_config2stream_args(json.loads(f.read())) - - -@pytest.fixture(scope="module") -def config(): - """Generates fake config""" - return { - "subdomain": "fake_domain", - "start_date": "2020-01-01T00:00:00Z", - "auth_method": {"auth_method": "api_token", "email": "email@email.com", "api_token": "fake_api_token"}, - } - - -@pytest.mark.parametrize( - "header_name,header_value,expected", - [ - # Retry-After > 0 - ("Retry-After", "123", 123), - # Retry-After < 0 - ("Retry-After", "-123", None), - # X-Rate-Limit > 0 - ("X-Rate-Limit", "100", 1.2), - # X-Rate-Limit header < 0 - ("X-Rate-Limit", "-100", None), - # Random header - ("Fake-Header", "-100", None), - ], -) -def test_backoff_cases(prepare_stream_args, header_name, header_value, expected): - """Zendesk sends the header different value for backoff logic""" - - stream = Tags(**prepare_stream_args) - with requests_mock.Mocker() as m: - url = stream.url_base + stream.path() - - m.get(url, headers={header_name: header_value}, status_code=429) - result = stream.backoff_time(requests.get(url)) - if expected: - assert (result - expected) < 0.005 - else: - assert result is None - - -@pytest.mark.parametrize( - "status_code,expected_comment_count,expected_exception", - [ - # success - (200, 1, None), - # not found ticket - (404, 0, None), - # some another code error. - (403, 0, HTTPError), - ], -) -def test_comments_not_found_ticket(prepare_stream_args, status_code, expected_comment_count, expected_exception): - """Checks the case when some ticket is removed while sync of comments""" - fake_id = 12345 - stream = TicketComments(**prepare_stream_args) - with requests_mock.Mocker() as comment_mock: - path = f"tickets/{fake_id}/comments.json" - stream.path = Mock(return_value=path) - url = stream.url_base + path - comment_mock.get( - url, - status_code=status_code, - json={ - "comments": [ - { - "id": fake_id, - TicketComments.cursor_field: "2121-07-22T06:55:55Z", - } - ] - }, - ) - comments = stream.read_records( - sync_mode=None, - stream_slice={ - "id": fake_id, - }, - ) - if expected_exception: - with pytest.raises(expected_exception): - next(comments) - else: - assert len(list(comments)) == expected_comment_count - - -@pytest.mark.parametrize( - "input_data,expected_data", - [ - ( - {"id": 123, "custom_fields": [{"id": 3213212, "value": ["fake_3000", "fake_5555"]}]}, - {"id": 123, "custom_fields": [{"id": 3213212, "value": "['fake_3000', 'fake_5555']"}]}, - ), - ( - {"id": 234, "custom_fields": [{"id": 2345234, "value": "fake_123"}]}, - {"id": 234, "custom_fields": [{"id": 2345234, "value": "fake_123"}]}, - ), - ( - {"id": 345, "custom_fields": [{"id": 5432123, "value": 55432.321}]}, - {"id": 345, "custom_fields": [{"id": 5432123, "value": "55432.321"}]}, - ), - ], -) -def test_transform_for_tickets_stream(config, input_data, expected_data): - """Checks Transform in case when records come with invalid fields data types""" - test_catalog = ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=AirbyteStream(name="tickets", json_schema={}), - sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.overwrite, - ) - ] - ) - - with requests_mock.Mocker() as ticket_mock: - ticket_mock.get( - f"https://{config['subdomain']}.zendesk.com/api/v2/incremental/tickets.json", - status_code=200, - json={"tickets": [input_data], "end_time": "2021-07-22T06:55:55Z", "end_of_stream": True}, - ) - - source = SourceZendeskSupport() - records = source.read(MagicMock(), config, test_catalog, None) - for record in records: - assert record.record.data == expected_data diff --git a/docs/integrations/sources/zendesk-support.md b/docs/integrations/sources/zendesk-support.md index 7fc8ea1afa91..83cdf21b65f5 100644 --- a/docs/integrations/sources/zendesk-support.md +++ b/docs/integrations/sources/zendesk-support.md @@ -97,6 +97,7 @@ We recommend creating a restricted, read-only key specifically for Airbyte acces | Version | Date | Pull Request | Subject | |:---------|:-----------| :----- |:-------------------------------------------------------| +| `0.2.0` | 2022-03-01 | [9456](https://github.com/airbytehq/airbyte/pull/9456) | Update source to use future requests | | `0.1.12` | 2022-01-25 | [9785](https://github.com/airbytehq/airbyte/pull/9785) | Add log message | | `0.1.11` | 2021-12-21 | [8987](https://github.com/airbytehq/airbyte/pull/8987) | Update connector fields title/description | | `0.1.9` | 2021-12-16 | [8616](https://github.com/airbytehq/airbyte/pull/8616) | Adds Brands, CustomRoles and Schedules |