diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index da507a63f82f..a129bba53bd4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -617,7 +617,7 @@ - name: Zendesk Talk sourceDefinitionId: c8630570-086d-4a40-99ae-ea5b18673071 dockerRepository: airbyte/source-zendesk-talk - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-talk sourceType: api - sourceDefinitionId: cdaf146a-9b75-49fd-9dd2-9d64a0bb4781 diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index bbd6eb888cbe..9ccd93f20511 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5998,41 +5998,49 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-zendesk-talk:0.1.2" +- dockerImage: "airbyte/source-zendesk-talk:0.1.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-talk" + changelogUrl: "https://docs.airbyte.io/integrations/sources/zendesk-talk" connectionSpecification: - $schema: "http://json-schema.org/draft-07/schema#" title: "Zendesk Talk Spec" type: "object" - required: - - "start_date" - - "subdomain" - - "access_token" - - "email" - additionalProperties: false properties: - start_date: - type: "string" - description: "The date from which you'd like to replicate data for Zendesk\ - \ Talk API, in the format YYYY-MM-DDT00:00:00Z." - examples: - - "2021-04-01T00:00:00Z" - pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" subdomain: + title: "Subdomain" + description: "The subdomain for your Zendesk Talk." type: "string" - description: "The subdomain for your Zendesk Talk" access_token: - type: "string" + title: "Access Token" description: "The value of the API token generated. See the docs for more information" + >docs for more information." airbyte_secret: true + type: "string" email: + title: "Email" + description: "The user email for your Zendesk account." type: "string" - description: "The user email for your Zendesk account" + start_date: + title: "Replication Start Date" + description: "The date/datetime from which you'd like to replicate data\ + \ for Zendesk Talk API, in the format YYYY-MM-DDT00:00:00Z. The time part\ + \ is optional." + pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$" + examples: + - "2017-01-25T00:00:00Z" + - "2017-01-25" + type: "string" + format: "date-time" + required: + - "subdomain" + - "access_token" + - "email" + - "start_date" + supportsIncremental: true supportsNormalization: false supportsDBT: false - supported_destination_sync_modes: [] + supported_destination_sync_modes: + - "append" - dockerImage: "airbyte/source-sentry:0.1.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/sentry" diff --git a/airbyte-integrations/connectors/source-zendesk-talk/.dockerignore b/airbyte-integrations/connectors/source-zendesk-talk/.dockerignore index 1b1c398629e9..09f92f1fb2ba 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/.dockerignore +++ b/airbyte-integrations/connectors/source-zendesk-talk/.dockerignore @@ -1,6 +1,6 @@ * !Dockerfile -!Dockerfile.test +!main.py !source_zendesk_talk !setup.py !secrets diff --git a/airbyte-integrations/connectors/source-zendesk-talk/Dockerfile b/airbyte-integrations/connectors/source-zendesk-talk/Dockerfile index 02bc24661fc7..b3ef1ba773b1 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/Dockerfile +++ b/airbyte-integrations/connectors/source-zendesk-talk/Dockerfile @@ -1,18 +1,16 @@ -FROM airbyte/integration-base-python:0.1.1 +FROM python:3.7-slim # Bash is installed for more convenient debugging. RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* -ENV CODE_PATH="source_zendesk_talk" -ENV AIRBYTE_IMPL_MODULE="source_zendesk_talk" -ENV AIRBYTE_IMPL_PATH="SourceZendeskTalk" - WORKDIR /airbyte/integration_code -COPY $CODE_PATH ./$CODE_PATH +COPY source_zendesk_talk ./source_zendesk_talk +COPY main.py ./ COPY setup.py ./ RUN pip install . -ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh" +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-zendesk-talk diff --git a/airbyte-integrations/connectors/source-zendesk-talk/README.md b/airbyte-integrations/connectors/source-zendesk-talk/README.md index 7b7eec033c3d..a414b1323f5e 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/README.md +++ b/airbyte-integrations/connectors/source-zendesk-talk/README.md @@ -1,6 +1,6 @@ -# Zendesk Talk Source +# Zendesk Talk Source -This is the repository for the Zendesk Talk source connector, written in Python. +This is the repository for the Zendesk Talk source connector, written in Python. For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/zendesk-talk). ## Local development @@ -44,19 +44,12 @@ See `sample_files/sample_config.json` for a sample config file. **If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source zendesk-talk test creds` and place them into `secrets/config.json`. - ### Locally running the connector ``` -python main_dev.py spec -python main_dev.py check --config secrets/config.json -python main_dev.py discover --config secrets/config.json -python main_dev.py read --config secrets/config.json --catalog sample_files/configured_catalog.json -``` - -### Unit Tests -To run unit tests locally, from the connector directory run: -``` -python -m pytest unit_tests +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json ``` ### Locally running the connector docker image @@ -82,19 +75,55 @@ docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-zendesk-talk:dev check docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-zendesk-talk:dev discover --config /secrets/config.json docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/source-zendesk-talk:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json ``` +## Testing + Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` ### Integration Tests -1. From the airbyte project root, run `./gradlew :airbyte-integrations:connectors:source-zendesk-talk:integrationTest` to run the standard integration test suite. -1. To run additional integration tests, place your integration tests in a new directory `integration_tests` and run them with `python -m pytest -s integration_tests`. - Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unittest run: +``` +./gradlew :airbyte-integrations:connectors:source-zendesk-talk:unitTest +``` +To run acceptance and custom integration tests run: +``` +./gradlew :airbyte-integrations:connectors:source-zendesk-talk:IntegrationTest +``` ## Dependency Management All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list ### Publishing a new version of the connector You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? 1. Make sure your changes are passing unit and integration tests -1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use SemVer). +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). 1. Create a Pull Request 1. Pat yourself on the back for being an awesome contributor 1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master diff --git a/airbyte-integrations/connectors/source-zendesk-talk/acceptance-test-config.yml b/airbyte-integrations/connectors/source-zendesk-talk/acceptance-test-config.yml new file mode 100644 index 000000000000..ee4de5d3440e --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/acceptance-test-config.yml @@ -0,0 +1,26 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +# intentionally left out explicit configured_catalog.json to test all streams from discovery +connector_image: airbyte/source-zendesk-talk:dev +tests: + spec: + - spec_path: "integration_tests/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + incremental: + - config_path: "secrets/config.json" + future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + - config_path: "secrets/config.json" + # Statistics streams (with single record) have artificial PK that changes everytime + ignored_fields: + "account_overview": ["current_timestamp"] + "agents_overview": ["current_timestamp"] + "current_queue_activity": ["current_timestamp"] diff --git a/airbyte-integrations/connectors/source-zendesk-talk/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-zendesk-talk/acceptance-test-docker.sh new file mode 100644 index 000000000000..e4d8b1cef896 --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-zendesk-talk/build.gradle b/airbyte-integrations/connectors/source-zendesk-talk/build.gradle index 257ff5b36efc..5bbfaf2ddea1 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/build.gradle +++ b/airbyte-integrations/connectors/source-zendesk-talk/build.gradle @@ -1,27 +1,13 @@ plugins { id 'airbyte-python' id 'airbyte-docker' - id 'airbyte-standard-source-test-file' + id 'airbyte-source-acceptance-test' } airbytePython { moduleDirectory 'source_zendesk_talk' } -airbyteStandardSourceTestFile { - specPath = "source_zendesk_talk/spec.json" - configPath = "secrets/config.json" - configuredCatalogPath = "sample_files/configured_catalog.json" -} - -task("pythonIntegrationTests", type: PythonTask, dependsOn: installTestReqs) { - module = "pytest" - command = "-s integration_tests" -} - -integrationTest.dependsOn("pythonIntegrationTests") - dependencies { - implementation files(project(':airbyte-integrations:bases:base-standard-source-test-file').airbyteDocker.outputs) - implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs) + implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs) } diff --git a/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/__init__.py b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/__init__.py new file mode 100644 index 000000000000..46b7376756ec --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/abnormal_state.json new file mode 100644 index 000000000000..6c8e657de064 --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/abnormal_state.json @@ -0,0 +1,8 @@ +{ + "calls": { + "updated_at": "2121-01-01T00:00:00Z" + }, + "call_legs": { + "updated_at": "2121-01-01T00:00:00Z" + } +} diff --git a/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/acceptance.py new file mode 100644 index 000000000000..0347f2a0b143 --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/acceptance.py @@ -0,0 +1,14 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + yield diff --git a/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/configured_catalog.json new file mode 100644 index 000000000000..c30bd7ae468c --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/configured_catalog.json @@ -0,0 +1,199 @@ +{ + "streams": [ + { + "stream": { + "name": "account_overview", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["current_timestamp"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "addresses", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "agents_activity", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["agent_id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "agents_overview", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["current_timestamp"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "calls", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["updated_at"], + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "call_legs", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["updated_at"], + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "current_queue_activity", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["current_timestamp"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "greetings", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "greeting_categories", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "ivr_menus", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "ivr_routes", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "ivrs", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + }, + { + "stream": { + "name": "phone_numbers", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": null, + "default_cursor_field": null, + "source_defined_primary_key": [["id"]], + "namespace": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "append", + "primary_key": null + } + ] +} diff --git a/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/integration_test.py deleted file mode 100644 index e33dbef07d52..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/integration_test.py +++ /dev/null @@ -1,39 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - - -import json -from pathlib import Path - -import pytest -from airbyte_protocol import ConfiguredAirbyteCatalog, Type -from base_python import AirbyteLogger -from source_zendesk_talk.source import SourceZendeskTalk - -BASE_DIRECTORY = Path(__file__).resolve().parent.parent - - -@pytest.fixture(name="config_credentials") -def config_credentials_fixture(): - with open(str(BASE_DIRECTORY / "secrets/config.json"), "r") as f: - return json.load(f) - - -@pytest.fixture(name="configured_catalog") -def configured_catalog_fixture(): - return ConfiguredAirbyteCatalog.parse_file(BASE_DIRECTORY / "sample_files/configured_catalog_activities_overview.json") - - -class TestZendeskTalkSource: - def test_streams_outputs_records(self, config_credentials, configured_catalog): - """ - Using standard tests is unreliable for Agent Activities and Agent Overview streams, - because the data there changes in real-time, therefore additional pytests are used. - """ - records = [] - for message in SourceZendeskTalk().read(AirbyteLogger(), config_credentials, configured_catalog): - if message.type == Type.RECORD: - records.append(message) - - assert len(records) > 0 diff --git a/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/invalid_config.json new file mode 100644 index 000000000000..53a3ef003b81 --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/invalid_config.json @@ -0,0 +1,6 @@ +{ + "email": "integration-test@airbyte.io", + "access_token": "some_token", + "subdomain": "domain", + "start_date": "2021-04-01T00:00:00Z" +} diff --git a/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/spec.json b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/spec.json new file mode 100644 index 000000000000..cf6171f0ad81 --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/integration_tests/spec.json @@ -0,0 +1,40 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/zendesk-talk", + "changelogUrl": "https://docs.airbyte.io/integrations/sources/zendesk-talk", + "connectionSpecification": { + "title": "Zendesk Talk Spec", + "type": "object", + "properties": { + "subdomain": { + "title": "Subdomain", + "description": "The subdomain for your Zendesk Talk.", + "type": "string" + }, + "access_token": { + "title": "Access Token", + "description": "The value of the API token generated. See the docs for more information.", + "airbyte_secret": true, + "type": "string" + }, + "email": { + "title": "Email", + "description": "The user email for your Zendesk account.", + "type": "string" + }, + "start_date": { + "title": "Replication Start Date", + "description": "The date/datetime from which you'd like to replicate data for Zendesk Talk API, in the format YYYY-MM-DDT00:00:00Z. The time part is optional.", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$", + "examples": ["2017-01-25T00:00:00Z", "2017-01-25"], + "type": "string", + "format": "date-time" + } + }, + "required": ["subdomain", "access_token", "email", "start_date"] + }, + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["append"], + "authSpecification": null +} diff --git a/airbyte-integrations/connectors/source-zendesk-talk/main_dev.py b/airbyte-integrations/connectors/source-zendesk-talk/main.py similarity index 83% rename from airbyte-integrations/connectors/source-zendesk-talk/main_dev.py rename to airbyte-integrations/connectors/source-zendesk-talk/main.py index fdfc0f72210c..948b2eed2310 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/main_dev.py +++ b/airbyte-integrations/connectors/source-zendesk-talk/main.py @@ -5,7 +5,7 @@ import sys -from base_python.entrypoint import launch +from airbyte_cdk.entrypoint import launch from source_zendesk_talk import SourceZendeskTalk if __name__ == "__main__": diff --git a/airbyte-integrations/connectors/source-zendesk-talk/requirements.txt b/airbyte-integrations/connectors/source-zendesk-talk/requirements.txt index dd447512e620..7be17a56d745 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/requirements.txt +++ b/airbyte-integrations/connectors/source-zendesk-talk/requirements.txt @@ -1,4 +1,3 @@ # This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies. --e ../../bases/airbyte-protocol --e ../../bases/base-python +-e ../../bases/source-acceptance-test -e . diff --git a/airbyte-integrations/connectors/source-zendesk-talk/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-zendesk-talk/sample_files/configured_catalog.json deleted file mode 100644 index d1e5973ad14e..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-talk/sample_files/configured_catalog.json +++ /dev/null @@ -1,771 +0,0 @@ -{ - "streams": [ - { - "stream": { - "name": "phone_numbers", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "call_recording_consent": { - "type": ["null", "string"] - }, - "capabilities": { - "type": ["null", "object"], - "properties": { - "mms": { - "type": ["null", "boolean"] - }, - "sms": { - "type": ["null", "boolean"] - }, - "voice": { - "type": ["null", "boolean"] - } - } - }, - "categorised_greetings": { - "type": ["null", "object"] - }, - "categorised_greetings_with_sub_settings": { - "type": ["null", "object"] - }, - "country_code": { - "type": ["null", "string"] - }, - "created_at": { - "type": ["null", "string"] - }, - "default_greeting_ids": { - "type": ["null", "array"], - "items": { - "type": "string" - } - }, - "default_group_id": { - "type": ["null", "integer"] - }, - "display_number": { - "type": ["null", "string"] - }, - "external": { - "type": ["null", "boolean"] - }, - "failover_number": { - "type": ["null", "string"] - }, - "greeting_ids": { - "type": ["null", "array"] - }, - "group_ids": { - "type": ["null", "array"] - }, - "id": { - "type": ["null", "integer"] - }, - "ivr_id": { - "type": ["null", "integer"] - }, - "line_type": { - "type": ["null", "string"] - }, - "location": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "nickname": { - "type": ["null", "string"] - }, - "number": { - "type": ["null", "string"] - }, - "outbound_enabled": { - "type": ["null", "boolean"] - }, - "priority": { - "type": ["null", "integer"] - }, - "recorded": { - "type": ["null", "boolean"] - }, - "schedule_id": { - "type": ["null", "integer"] - }, - "sms_enabled": { - "type": ["null", "boolean"] - }, - "sms_group_id": { - "type": ["null", "integer"] - }, - "token": { - "type": ["null", "string"] - }, - "toll_free": { - "type": ["null", "boolean"] - }, - "transcription": { - "type": ["null", "boolean"] - }, - "voice_enabled": { - "type": ["null", "boolean"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "addresses", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "city": { - "type": ["null", "string"] - }, - "country_code": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "integer"] - }, - "name": { - "type": ["null", "string"] - }, - "provider_reference": { - "type": ["null", "string"] - }, - "province": { - "type": ["null", "string"] - }, - "state": { - "type": ["null", "string"] - }, - "street": { - "type": ["null", "string"] - }, - "zip": { - "type": ["null", "string"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "greeting_categories", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "type": ["null", "integer"] - }, - "name": { - "type": ["null", "string"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "greetings", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "active": { - "type": ["null", "boolean"] - }, - "audio_name": { - "type": ["null", "string"] - }, - "audio_url": { - "type": ["null", "string"] - }, - "category_id": { - "type": ["null", "integer"] - }, - "default": { - "type": ["null", "boolean"] - }, - "default_lang": { - "type": ["null", "boolean"] - }, - "has_sub_settings": { - "type": ["null", "boolean"] - }, - "id": { - "type": ["null", "string"] - }, - "ivr_ids": { - "type": ["null", "array"], - "items": { - "type": ["string", "integer"] - } - }, - "name": { - "type": ["null", "string"] - }, - "pending": { - "type": ["null", "boolean"] - }, - "phone_number_ids": { - "type": ["null", "array"], - "items": { - "type": ["string", "integer"] - } - }, - "upload_id": { - "type": ["null", "integer"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "ivrs", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "type": ["null", "integer"] - }, - "menus": { - "type": ["null", "array"], - "items": { - "type": "object", - "properties": { - "default": { - "type": ["null", "boolean"] - }, - "greeting_id": { - "type": ["null", "integer"] - }, - "id": { - "type": ["null", "integer"] - }, - "ivr_id": { - "type": ["null", "integer"] - }, - "name": { - "type": ["null", "string"] - }, - "routes": { - "type": ["array", "null"], - "items": { - "type": "object", - "properties": { - "action": { - "type": ["null", "string"] - }, - "greeting": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "integer"] - }, - "keypress": { - "type": ["null", "string"] - }, - "option_text": { - "type": ["null", "string"] - }, - "options": { - "type": ["null", "object"] - }, - "overflow_options": { - "type": ["null", "object"] - } - } - } - } - } - } - }, - "name": { - "type": ["null", "string"] - }, - "phone_number_ids": { - "type": ["null", "array"], - "items": { - "type": ["string", "integer"] - } - }, - "phone_number_names": { - "type": ["null", "array"], - "items": { - "type": ["integer", "string"] - } - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "ivr_menus", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "default": { - "type": ["null", "boolean"] - }, - "greeting_id": { - "type": ["null", "integer"] - }, - "id": { - "type": ["null", "integer"] - }, - "ivr_id": { - "type": ["null", "integer"] - }, - "name": { - "type": ["null", "string"] - }, - "routes": { - "type": ["array", "null"], - "items": { - "type": "object", - "properties": { - "action": { - "type": ["null", "string"] - }, - "greeting": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "integer"] - }, - "keypress": { - "type": ["null", "string"] - }, - "option_text": { - "type": ["null", "string"] - }, - "options": { - "type": ["null", "object"] - }, - "overflow_options": { - "type": ["null", "object"] - } - } - } - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "ivr_routes", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "action": { - "type": ["null", "string"] - }, - "greeting": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "integer"] - }, - "ivr_id": { - "type": ["null", "integer"] - }, - "ivr_menu_id": { - "type": ["null", "integer"] - }, - "keypress": { - "type": ["null", "string"] - }, - "option_text": { - "type": ["null", "string"] - }, - "options": { - "type": ["null", "object"] - }, - "overflow_options": { - "type": ["null", "object"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "account_overview", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "average_call_duration": { - "type": ["null", "integer"] - }, - "average_queue_wait_time": { - "type": ["null", "integer"] - }, - "average_wrap_up_time": { - "type": ["null", "integer"] - }, - "max_calls_waiting": { - "type": ["null", "integer"] - }, - "max_queue_wait_time": { - "type": ["null", "integer"] - }, - "total_call_duration": { - "type": ["null", "integer"] - }, - "total_calls": { - "type": ["null", "integer"] - }, - "total_voicemails": { - "type": ["null", "integer"] - }, - "total_wrap_up_time": { - "type": ["null", "integer"] - }, - "average_callback_wait_time": { - "type": ["null", "integer"] - }, - "average_hold_time": { - "type": ["null", "integer"] - }, - "average_time_to_answer": { - "type": ["null", "integer"] - }, - "total_callback_calls": { - "type": ["null", "integer"] - }, - "total_calls_abandoned_in_queue": { - "type": ["null", "integer"] - }, - "total_calls_outside_business_hours": { - "type": ["null", "integer"] - }, - "total_calls_with_exceeded_queue_wait_time": { - "type": ["null", "integer"] - }, - "total_calls_with_requested_voicemail": { - "type": ["null", "integer"] - }, - "total_hold_time": { - "type": ["null", "integer"] - }, - "total_inbound_calls": { - "type": ["null", "integer"] - }, - "total_outbound_calls": { - "type": ["null", "integer"] - }, - "total_textback_requests": { - "type": ["null", "integer"] - }, - "total_embeddable_callback_calls": { - "type": ["null", "integer"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "current_queue_activity", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "agents_online": { - "type": ["null", "integer"] - }, - "average_wait_time": { - "type": ["null", "integer"] - }, - "callbacks_waiting": { - "type": ["null", "integer"] - }, - "calls_waiting": { - "type": ["null", "integer"] - }, - "embeddable_callbacks_waiting": { - "type": ["null", "integer"] - }, - "longest_wait_time": { - "type": ["null", "integer"] - } - } - }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "calls", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "agent_id": { - "type": ["null", "integer"] - }, - "call_charge": { - "type": ["null", "string"] - }, - "call_group_id": { - "type": ["null", "integer"] - }, - "call_recording_consent": { - "type": ["null", "string"] - }, - "call_recording_consent_action": { - "type": ["null", "string"] - }, - "call_recording_consent_keypress": { - "type": ["null", "string"] - }, - "callback": { - "type": ["null", "boolean"] - }, - "callback_source": { - "type": ["null", "string"] - }, - "completion_status": { - "type": ["null", "string"] - }, - "consultation_time": { - "type": ["null", "integer"] - }, - "created_at": { - "type": ["null", "string"] - }, - "customer_requested_voicemail": { - "type": ["null", "boolean"] - }, - "default_group": { - "type": ["null", "boolean"] - }, - "direction": { - "type": ["null", "string"] - }, - "duration": { - "type": ["null", "integer"] - }, - "exceeded_queue_time": { - "type": ["null", "boolean"] - }, - "hold_time": { - "type": ["null", "integer"] - }, - "id": { - "type": ["null", "integer"] - }, - "ivr_action": { - "type": ["null", "string"] - }, - "ivr_destination_group_name": { - "type": ["null", "string"] - }, - "ivr_hops": { - "type": ["null", "integer"] - }, - "ivr_routed_to": { - "type": ["null", "string"] - }, - "ivr_time_spent": { - "type": ["null", "integer"] - }, - "minutes_billed": { - "type": ["null", "integer"] - }, - "not_recording_time": { - "type": ["null", "integer"] - }, - "outside_business_hours": { - "type": ["null", "boolean"] - }, - "overflowed": { - "type": ["null", "boolean"] - }, - "overflowed_to": { - "type": ["null", "string"] - }, - "phone_number": { - "type": ["null", "string"] - }, - "phone_number_id": { - "type": ["null", "integer"] - }, - "quality_issues": { - "type": ["null", "array"] - }, - "recording_control_interactions": { - "type": ["null", "integer"] - }, - "recording_time": { - "type": ["null", "integer"] - }, - "talk_time": { - "type": ["null", "integer"] - }, - "ticket_id": { - "type": ["null", "integer"] - }, - "time_to_answer": { - "type": ["null", "integer"] - }, - "updated_at": { - "type": ["null", "string"] - }, - "voicemail": { - "type": ["null", "boolean"] - }, - "wait_time": { - "type": ["null", "integer"] - }, - "wrap_up_time": { - "type": ["null", "integer"] - } - } - }, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": true - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "call_legs", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "agent_id": { - "type": ["null", "integer"] - }, - "available_via": { - "type": ["null", "string"] - }, - "call_charge": { - "type": ["null", "string"] - }, - "call_id": { - "type": ["null", "integer"] - }, - "completion_status": { - "type": ["null", "string"] - }, - "conference_from": { - "type": ["null", "integer"] - }, - "conference_time": { - "type": ["null", "integer"] - }, - "conference_to": { - "type": ["null", "integer"] - }, - "consultation_from": { - "type": ["null", "integer"] - }, - "consultation_time": { - "type": ["null", "integer"] - }, - "consultation_to": { - "type": ["null", "integer"] - }, - "created_at": { - "type": ["null", "string"] - }, - "duration": { - "type": ["null", "integer"] - }, - "forwarded_to": { - "type": ["null", "string"] - }, - "hold_time": { - "type": ["null", "integer"] - }, - "id": { - "type": ["null", "integer"] - }, - "minutes_billed": { - "type": ["null", "integer"] - }, - "quality_issues": { - "type": ["null", "array"] - }, - "talk_time": { - "type": ["null", "integer"] - }, - "transferred_from": { - "type": ["null", "integer"] - }, - "transferred_to": { - "type": ["null", "integer"] - }, - "type": { - "type": ["null", "string"] - }, - "updated_at": { - "type": ["null", "string"] - }, - "user_id": { - "type": ["null", "integer"] - }, - "wrap_up_time": { - "type": ["null", "integer"] - } - } - }, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": true - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - } - ] -} diff --git a/airbyte-integrations/connectors/source-zendesk-talk/setup.py b/airbyte-integrations/connectors/source-zendesk-talk/setup.py index 1a374c5ba2bf..d8a499f8c4b2 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/setup.py +++ b/airbyte-integrations/connectors/source-zendesk-talk/setup.py @@ -5,16 +5,13 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = [ - "airbyte-protocol", - "base-python", - "backoff==1.10.0", - "pendulum==1.2.0", - "requests==2.25.1", -] - -TEST_REQUIREMENTS = ["pytest", "requests_mock==1.8.0"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1"] +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6", + "requests_mock~=1.8", +] setup( name="source_zendesk_talk", @@ -22,6 +19,9 @@ author="Airbyte", author_email="contact@airbyte.io", packages=find_packages(), - install_requires=MAIN_REQUIREMENTS + TEST_REQUIREMENTS, + install_requires=MAIN_REQUIREMENTS, package_data={"": ["*.json", "schemas/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, ) diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/__init__.py b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/__init__.py index b9d0247ebef6..cd0c4e4269e4 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/__init__.py +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/__init__.py @@ -1,3 +1,6 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# from .source import SourceZendeskTalk __all__ = ["SourceZendeskTalk"] diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/api.py b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/api.py deleted file mode 100644 index ab975ee9c92a..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/api.py +++ /dev/null @@ -1,313 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - - -import sys -from abc import ABC, abstractmethod -from functools import partial -from http import HTTPStatus -from typing import Any, Callable, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union - -import backoff -import pendulum as pendulum -import requests -from base_python.entrypoint import logger - -from .errors import ZendeskAccessDenied, ZendeskInvalidAuth, ZendeskRateLimited, ZendeskTimeout - - -def retry_pattern(backoff_type, **wait_gen_kwargs): - def sleep_on_ratelimit(details): - _, exc, _ = sys.exc_info() - logger.info(str(exc)) - logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} more seconds then retrying...") - - def log_giveup(_details): - logger.error("Max retry limit reached") - - return backoff.on_exception( - backoff_type, - (ZendeskRateLimited, ZendeskTimeout), - jitter=None, - on_backoff=sleep_on_ratelimit, - on_giveup=log_giveup, - **wait_gen_kwargs, - ) - - -class API: - """Zendesk Talk API interface, authorize, retrieve and post, supports backoff logic""" - - def __init__(self, subdomain: str, access_token: str, email: str): - self.BASE_URL = f"https://{subdomain}.zendesk.com/api/v2/channels/voice" - self._session = requests.Session() - self._session.headers = {"Content-Type": "application/json"} - self._session.auth = (f"{email}/token", access_token) - - @staticmethod - def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]: - """Handle response""" - message = "Unknown error" - if response.headers.get("content-type") == "application/json;charset=utf-8" and response.status_code != HTTPStatus.OK: - message = response.json().get("message") - - if response.status_code == HTTPStatus.FORBIDDEN: - raise ZendeskAccessDenied(response.text, response=response) - elif response.status_code == HTTPStatus.UNAUTHORIZED: - raise ZendeskInvalidAuth(response.text, response=response) - elif response.status_code == HTTPStatus.TOO_MANY_REQUESTS: - raise ZendeskRateLimited( - "429 Rate Limit Exceeded: API rate-limit. See https://developer.zendesk.com/rest_api/docs/support/usage_limits", - response=response, - ) - elif response.status_code in (HTTPStatus.BAD_GATEWAY, HTTPStatus.SERVICE_UNAVAILABLE): - raise ZendeskTimeout(message, response=response) - else: - response.raise_for_status() - - return response.json() - - @retry_pattern(backoff.expo, max_tries=4, factor=5) - def get(self, url: str, domain_inclusion=False, params=None) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]: - response = self._session.get(url if domain_inclusion else self.BASE_URL + url, params=params or {}) - return self._parse_and_handle_errors(response) - - -class Stream(ABC): - """Base class for all streams. Responsible for data fetching and pagination""" - - stream_stats = False - has_more = "next_page" - count_field = "count" - data_field = "results" - - @property - @abstractmethod - def url(self): - """Default URL to read from""" - - def __init__(self, api: API, start_date: str = None, **kwargs): - self._api: API = api - self._start_date = pendulum.parse(start_date) - - @property - def name(self) -> str: - stream_name = self.__class__.__name__ - if stream_name.endswith("Stream"): - stream_name = stream_name[: -len("Stream")] - return stream_name - - def list(self, fields) -> Iterable: - if self.stream_stats: - yield self.read_stats(partial(self._api.get, url=self.url)) - else: - yield from self.read(partial(self._api.get, url=self.url)) - - def _paginator(self, getter: Callable) -> Iterator: - domain_inclusion = False - counter = 0 - while True: - response = getter(domain_inclusion=domain_inclusion) - if response.get(self.data_field) is None: - raise RuntimeError("Unexpected API response: {} not in {}".format(self.data_field, response.keys())) - - yield from response[self.data_field] - counter += len(response[self.data_field]) - - if response[self.count_field] <= counter: - break - else: - getter.keywords.update({"url": response[self.has_more], "params": None}) - domain_inclusion = True - - def read_stats(self, getter: Callable) -> Any: - response = getter() - return response[self.data_field] - - def read(self, getter: Callable) -> Iterator: - yield from self._paginator(getter) - - -class IncrementalStream(Stream, ABC): - """Stream that supports state and incremental read""" - - state_pk = "timestamp" - - @property - @abstractmethod - def updated_at_field(self): - """Name of the field associated with the state""" - - @property - def state(self) -> Optional[Mapping[str, Any]]: - """Current state, if wasn't set return None""" - if self._state: - return {self.state_pk: str(self._state)} - return None - - @state.setter - def state(self, value): - self._state = pendulum.parse(value[self.state_pk]) - self._start_date = max(self._state, self._start_date) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._state = None - - def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: - """Apply state filter to set of records, update cursor(state) if necessary in the end""" - latest_cursor = None - for record in self._paginator(getter): - yield record - cursor = pendulum.parse(record[self.updated_at_field]) - latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor - - if latest_cursor: - new_state = max(latest_cursor, self._state) if self._state else latest_cursor - if new_state != self._state: - logger.info(f"Advancing bookmark for {self.name} stream from {self._state} to {latest_cursor}") - self._state = new_state - self._start_date = self._state - - -class PhoneNumbersStream(Stream): - """Phone Numbers - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/phone_numbers#list-phone-numbers - """ - - url = "/phone_numbers" - data_field = "phone_numbers" - - -class AddressesStream(Stream): - """Addresses - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/phone_numbers#list-phone-numbers - """ - - url = "/addresses" - data_field = "addresses" - - -class GreetingCategoriesStream(Stream): - """Greeting Categories - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/greetings#list-greeting-categories - """ - - url = "/greeting_categories" - data_field = "greeting_categories" - - -class GreetingsStream(Stream): - """Greetings - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/greetings#list-greetings - """ - - url = "/greetings" - data_field = "greetings" - - -class IVRsStream(Stream): - """IVRs - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/ivrs#list-ivrs - """ - - url = "/ivr" - data_field = "ivrs" - - -class IVRMenusStream(Stream): - """IVR Menus - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/ivrs#list-ivrs - """ - - url = "/ivr/{}/menus" - data_field = "ivr_menus" - - def list(self, fields) -> Iterable: - ivr_obj = IVRsStream(api=self._api, start_date=str(self._start_date)) - for ivr in ivr_obj.list(fields=[]): - for ivr_menu in self.read(partial(self._api.get, url=self.url.format(ivr["id"]))): - yield {"ivr_id": ivr["id"], **ivr_menu} - - -class IVRRoutesStream(Stream): - """IVR Routes - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/ivr_routes#list-ivr-routes - """ - - url = "/ivr/{}/menus/{}/routes" - data_field = "ivr_routes" - - def list(self, fields) -> Iterable: - ivr_menu_obj = IVRMenusStream(api=self._api, start_date=str(self._start_date)) - for ivr_menu in ivr_menu_obj.list(fields=[]): - for ivr_route in self.read(partial(self._api.get, url=self.url.format(ivr_menu["ivr_id"], ivr_menu["id"]))): - yield {"ivr_id": ivr_menu["ivr_id"], "ivr_menu_id": ivr_menu["id"], **ivr_route} - - -class AccountOverviewStream(Stream): - """Account Overview - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/stats#show-account-overview - """ - - url = "/stats/account_overview" - data_field = "account_overview" - stream_stats = True - - -class AgentsActivityStream(Stream): - """Agents Activity - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/stats#list-agents-activity - """ - - url = "/stats/agents_activity" - data_field = "agents_activity" - - -class AgentsOverviewStream(Stream): - """Agents Overview - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/stats#show-agents-overview - """ - - url = "/stats/agents_overview" - data_field = "agents_overview" - stream_stats = True - - -class CurrentQueueActivityStream(Stream): - """Current Queue Activity - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/stats#show-current-queue-activity - """ - - url = "/stats/current_queue_activity" - data_field = "current_queue_activity" - stream_stats = True - - -class CallsStream(IncrementalStream): - """Calls - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/incremental_exports#incremental-calls-export - """ - - url = "/stats/incremental/calls" - data_field = "calls" - updated_at_field = "updated_at" - - def list(self, fields) -> Iterable: - params = {"start_time": int(self._start_date.timestamp())} - yield from self.read(partial(self._api.get, url=self.url, params=params)) - - -class CallLegsStream(IncrementalStream): - """Call Legs - Docs: https://developer.zendesk.com/rest_api/docs/voice-api/incremental_exports#incremental-call-legs-export - """ - - url = "/stats/incremental/legs" - data_field = "legs" - updated_at_field = "updated_at" - - def list(self, fields) -> Iterable: - params = {"start_time": int(self._start_date.timestamp())} - yield from self.read(partial(self._api.get, url=self.url, params=params)) diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/client.py b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/client.py deleted file mode 100644 index 668969a8a5f1..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/client.py +++ /dev/null @@ -1,80 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - - -from typing import Any, Mapping, Tuple - -from base_python import BaseClient -from requests import HTTPError - -from .api import ( - API, - AccountOverviewStream, - AddressesStream, - AgentsActivityStream, - AgentsOverviewStream, - CallLegsStream, - CallsStream, - CurrentQueueActivityStream, - GreetingCategoriesStream, - GreetingsStream, - IVRMenusStream, - IVRRoutesStream, - IVRsStream, - PhoneNumbersStream, -) - - -class Client(BaseClient): - """Zendesk client, provides methods to discover and read streams""" - - def __init__(self, start_date: str, subdomain: str, access_token: str, email: str, **kwargs): - self._start_date = start_date - self._api = API(subdomain=subdomain, access_token=access_token, email=email) - - common_params = dict(api=self._api, start_date=self._start_date) - self._apis = { - "phone_numbers": PhoneNumbersStream(**common_params), - "addresses": AddressesStream(**common_params), - "greeting_categories": GreetingCategoriesStream(**common_params), - "greetings": GreetingsStream(**common_params), - "ivrs": IVRsStream(**common_params), - "ivr_menus": IVRMenusStream(**common_params), - "ivr_routes": IVRRoutesStream(**common_params), - "account_overview": AccountOverviewStream(**common_params), - "agents_activity": AgentsActivityStream(**common_params), - "agents_overview": AgentsOverviewStream(**common_params), - "current_queue_activity": CurrentQueueActivityStream(**common_params), - "calls": CallsStream(**common_params), - "call_legs": CallLegsStream(**common_params), - } - - super().__init__(**kwargs) - - def _enumerate_methods(self) -> Mapping[str, callable]: - return {name: api.list for name, api in self._apis.items()} - - def stream_has_state(self, name: str) -> bool: - """Tell if stream supports incremental sync""" - return hasattr(self._apis[name], "state") - - def get_stream_state(self, name: str) -> Any: - """Get state of stream with corresponding name""" - return self._apis[name].state - - def set_stream_state(self, name: str, state: Any): - """Set state of stream with corresponding name""" - self._apis[name].state = state - - def health_check(self) -> Tuple[bool, str]: - alive = True - error_msg = None - - try: - _ = list(self._apis["phone_numbers"].list(fields=[])) - except HTTPError as error: - alive = False - error_msg = repr(error) - - return alive, error_msg diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/errors.py b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/errors.py deleted file mode 100644 index 61728b776ebd..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/errors.py +++ /dev/null @@ -1,33 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - - -from requests import HTTPError - - -class ZendeskError(HTTPError): - """ - Base error class. - Subclassing HTTPError to avoid breaking existing code that expects only HTTPErrors. - """ - - -class ZendeskTimeout(ZendeskError): - """502/504 Zendesk has processing limits in place to prevent a single client from causing degraded performance, - and these responses indicate that those limits have been hit. You'll normally only see these timeout responses - when making a large number of requests over a sustained period. If you get one of these responses, - you should pause your requests for a few seconds, then retry. - """ - - -class ZendeskInvalidAuth(ZendeskError): - """401 Unauthorized""" - - -class ZendeskAccessDenied(ZendeskError): - """403 Forbidden""" - - -class ZendeskRateLimited(ZendeskError): - """429 Rate Limit Reached""" diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/account_overview.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/account_overview.json index ff4b8f16c41f..935cdb2b9efd 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/account_overview.json +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/account_overview.json @@ -2,6 +2,9 @@ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { + "current_timestamp": { + "type": "integer" + }, "average_call_duration": { "type": ["null", "integer"] }, diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/agents_activity.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/agents_activity.json index 2c6f057e6188..cd11a4071d79 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/agents_activity.json +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/agents_activity.json @@ -3,7 +3,7 @@ "type": "object", "properties": { "agent_id": { - "type": ["null", "string"] + "type": ["null", "integer"] }, "agent_state": { "type": ["null", "string"] diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/agents_overview.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/agents_overview.json index de41dd73af93..14baa4555021 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/agents_overview.json +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/agents_overview.json @@ -2,6 +2,9 @@ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { + "current_timestamp": { + "type": "integer" + }, "average_wrap_up_time": { "type": ["null", "integer"] }, diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/call_legs.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/call_legs.json index 104fde538056..5e3a0619dea8 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/call_legs.json +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/call_legs.json @@ -69,7 +69,8 @@ "type": ["null", "string"] }, "updated_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "user_id": { "type": ["null", "integer"] diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/calls.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/calls.json index 92b59886d478..7bd28996e9a5 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/calls.json +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/calls.json @@ -111,7 +111,8 @@ "type": ["null", "integer"] }, "updated_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "voicemail": { "type": ["null", "boolean"] diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/current_queue_activity.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/current_queue_activity.json index aa161c8a85e4..2b1a7ab7b09b 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/current_queue_activity.json +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/current_queue_activity.json @@ -2,6 +2,9 @@ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { + "current_timestamp": { + "type": "integer" + }, "agents_online": { "type": ["null", "integer"] }, diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivr_menus.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivr_menus.json index d9b11276b3a5..125915b090b5 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivr_menus.json +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivr_menus.json @@ -16,35 +16,6 @@ }, "name": { "type": ["null", "string"] - }, - "routes": { - "type": ["array", "null"], - "items": { - "type": "object", - "properties": { - "action": { - "type": ["null", "string"] - }, - "greeting": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "integer"] - }, - "keypress": { - "type": ["null", "string"] - }, - "option_text": { - "type": ["null", "string"] - }, - "options": { - "type": ["null", "object"] - }, - "overflow_options": { - "type": ["null", "object"] - } - } - } } } } diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivr_routes.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivr_routes.json index 6870abe3b318..b6ccb5535ff4 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivr_routes.json +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivr_routes.json @@ -27,7 +27,7 @@ "type": ["null", "object"] }, "overflow_options": { - "type": ["null", "object"] + "type": ["null", "array"] } } } diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivrs.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivrs.json index f77db3d9459b..35e2f67d8f00 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivrs.json +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/schemas/ivrs.json @@ -49,7 +49,7 @@ "type": ["null", "object"] }, "overflow_options": { - "type": ["null", "object"] + "type": ["null", "array"] } } } diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/source.py b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/source.py index 22bf6a55b262..55741b4e128a 100644 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/source.py +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/source.py @@ -1,12 +1,95 @@ # # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +from datetime import datetime +from typing import Any, List, Mapping, Tuple +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.models import ConnectorSpecification, DestinationSyncMode, SyncMode +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from pydantic import BaseModel, Field +from requests.auth import HTTPBasicAuth +from source_zendesk_talk.streams import ( + AccountOverview, + Addresses, + AgentsActivity, + AgentsOverview, + CallLegs, + Calls, + CurrentQueueActivity, + GreetingCategories, + Greetings, + IVRMenus, + IVRRoutes, + IVRs, + PhoneNumbers, +) -from base_python import BaseSource -from .client import Client +class ConnectorConfig(BaseModel): + class Config: + title = "Zendesk Talk Spec" + subdomain: str = Field( + description="The subdomain for your Zendesk Talk.", + ) + access_token: str = Field( + description='The value of the API token generated. See the docs for more information.', + airbyte_secret=True, + ) + email: str = Field(description="The user email for your Zendesk account.") + start_date: datetime = Field( + title="Replication Start Date", + description="The date/datetime from which you'd like to replicate data for Zendesk Talk API, in the format YYYY-MM-DDT00:00:00Z. The time part is optional.", + pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$", + examples=["2017-01-25T00:00:00Z", "2017-01-25"], + ) -class SourceZendeskTalk(BaseSource): - client_class = Client + +class SourceZendeskTalk(AbstractSource): + def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: + parsed_config = ConnectorConfig.parse_obj(config) + authenticator = HTTPBasicAuth(username=f"{parsed_config.email}/token", password=parsed_config.access_token) + stream = AccountOverview(authenticator=authenticator, subdomain=parsed_config.subdomain) + + account_info = next(iter(stream.read_records(sync_mode=SyncMode.full_refresh)), None) + if not account_info: + raise RuntimeError("Unable to read account information, please check the permissions of your token") + + return True, None + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + parsed_config = ConnectorConfig.parse_obj(config) + authenticator = HTTPBasicAuth(username=f"{parsed_config.email}/token", password=parsed_config.access_token) + common_kwargs = dict(authenticator=authenticator, subdomain=parsed_config.subdomain) + incremental_kwargs = dict(**common_kwargs, start_date=parsed_config.start_date) + + return [ + AccountOverview(**common_kwargs), + Addresses(**common_kwargs), + AgentsActivity(**common_kwargs), + AgentsOverview(**common_kwargs), + Calls(**incremental_kwargs), + CallLegs(**incremental_kwargs), + CurrentQueueActivity(**common_kwargs), + Greetings(**common_kwargs), + GreetingCategories(**common_kwargs), + IVRMenus(**common_kwargs), + IVRRoutes(**common_kwargs), + IVRs(**common_kwargs), + PhoneNumbers(**common_kwargs), + ] + + def spec(self, *args, **kwargs) -> ConnectorSpecification: + """ + Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) + required to run this integration. + """ + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.io/integrations/sources/zendesk-talk", + changelogUrl="https://docs.airbyte.io/integrations/sources/zendesk-talk", + supportsIncremental=True, + supported_destination_sync_modes=[DestinationSyncMode.append], + connectionSpecification=ConnectorConfig.schema(), + ) diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/spec.json b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/spec.json deleted file mode 100644 index 26e1dfdcf035..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/spec.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "documentationUrl": "https://docs.airbyte.io/integrations/sources/zendesk-talk", - "connectionSpecification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Zendesk Talk Spec", - "type": "object", - "required": ["start_date", "subdomain", "access_token", "email"], - "additionalProperties": false, - "properties": { - "start_date": { - "type": "string", - "description": "The date from which you'd like to replicate data for Zendesk Talk API, in the format YYYY-MM-DDT00:00:00Z.", - "examples": ["2021-04-01T00:00:00Z"], - "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" - }, - "subdomain": { - "type": "string", - "description": "The subdomain for your Zendesk Talk" - }, - "access_token": { - "type": "string", - "description": "The value of the API token generated. See the docs for more information", - "airbyte_secret": true - }, - "email": { - "type": "string", - "description": "The user email for your Zendesk account" - } - } - } -} diff --git a/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/streams.py b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/streams.py new file mode 100644 index 000000000000..9475cadba77a --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/source_zendesk_talk/streams.py @@ -0,0 +1,313 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Any, Iterable, Mapping, MutableMapping, Optional +from urllib.parse import parse_qs, urlparse + +import pendulum as pendulum +import requests +from airbyte_cdk.sources.streams.http import HttpStream + + +class ZendeskTalkStream(HttpStream, ABC): + """Base class for streams""" + + primary_key = "id" + + def __init__(self, subdomain: str, **kwargs): + """ Constructor, accepts subdomain to calculate correct url""" + super().__init__(**kwargs) + self._subdomain = subdomain + + @property + @abstractmethod + def data_field(self) -> str: + """ Specifies root object name in a stream response""" + + @property + def url_base(self) -> str: + """ API base url based on configured subdomain""" + return f"https://{self._subdomain}.zendesk.com/api/v2/channels/voice" + + def backoff_time(self, response: requests.Response) -> Optional[float]: + """ + Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header. + + This method is called only if should_backoff() returns True for the input request. + + :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff + to the default backoff behavior (e.g using an exponential algorithm). + """ + delay_time = response.headers.get("Retry-After") + if delay_time: + return int(delay_time) + return None + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed + to most other methods in this class to help you form headers, request bodies, query params, etc.. + + :param response: the most recent response from the API + :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response. + If there are no more pages in the result, return None. + """ + response_json = response.json() + next_page_url = response_json.get("next_page") + if next_page_url: + next_url = urlparse(next_page_url) + next_params = parse_qs(next_url.query) + return next_params + + return None + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + """Usually contains common params e.g. pagination size etc.""" + return dict(next_page_token or {}) + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ Simply parse json and iterates over root object""" + response_json = response.json() + if self.data_field: + response_json = response_json[self.data_field] + + if not isinstance(response_json, list): + response_json = [response_json] + + yield from response_json + + +class ZendeskTalkIncrementalStream(ZendeskTalkStream, ABC): + """Stream that supports state and incremental read, for now only incremental export endpoints use this class. + Docs: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports + """ + + # required to support old format as well (only read, but save as new) + legacy_cursor_field = "timestamp" + cursor_field = "updated_at" + filter_param = "start_time" + + def __init__(self, start_date: datetime, **kwargs): + super().__init__(**kwargs) + self._start_date = pendulum.instance(start_date) + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + """ + Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and + the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental. + """ + latest_state = current_stream_state.get(self.cursor_field, current_stream_state.get(self.legacy_cursor_field)) + new_cursor_value = max(latest_record[self.cursor_field], latest_state or latest_record[self.cursor_field]) + return {self.cursor_field: new_cursor_value} + + def request_params(self, stream_state=None, **kwargs): + """ Add incremental parameters""" + params = super().request_params(stream_state=stream_state, **kwargs) + + if self.filter_param not in params: + # use cursor as filter value only if it is not already a parameter (i.e. we are in the middle of the pagination) + stream_state = stream_state or {} + state_str = stream_state.get(self.cursor_field, stream_state.get(self.legacy_cursor_field)) + state = pendulum.parse(state_str) if state_str else self._start_date + params[self.filter_param] = max(state, self._start_date).int_timestamp + + return params + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed + to most other methods in this class to help you form headers, request bodies, query params, etc.. + + :param response: the most recent response from the API + :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response. + If there are no more pages in the result, return None. + """ + next_params = super().next_page_token(response) + if not next_params: + return None + + current_url = urlparse(response.request.url) + current_params = parse_qs(current_url.query) + + # check if cursor value was changed + if current_params[self.filter_param] != next_params[self.filter_param]: + return next_params + + return None + + +class ZendeskTalkSingleRecordStream(ZendeskTalkStream, ABC): + primary_key = "current_timestamp" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + for record in super().parse_response(response, **kwargs): + record["current_timestamp"] = pendulum.now().int_timestamp + yield record + + +class PhoneNumbers(ZendeskTalkStream): + """Phone Numbers + Docs: https://developer.zendesk.com/api-reference/voice/talk-api/phone_numbers/#list-phone-numbers + """ + + data_field = "phone_numbers" + + def path(self, **kwargs) -> str: + return "/phone_numbers" + + +class Addresses(ZendeskTalkStream): + """Addresses + Docs: https://developer.zendesk.com/api-reference/voice/talk-api/addresses/#list-addresses + """ + + data_field = "addresses" + + def path(self, **kwargs) -> str: + return "/addresses" + + +class GreetingCategories(ZendeskTalkStream): + """Greeting Categories + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/greetings#list-greeting-categories + """ + + data_field = "greeting_categories" + + def path(self, **kwargs) -> str: + return "/greeting_categories" + + +class Greetings(ZendeskTalkStream): + """Greetings + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/greetings#list-greetings + """ + + data_field = "greetings" + + def path(self, **kwargs) -> str: + return "/greetings" + + +class IVRs(ZendeskTalkStream): + """IVRs + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/ivrs#list-ivrs + """ + + name = "ivrs" + data_field = "ivrs" + use_cache = True + cache_filename = "ivrs.yml" + + def path(self, **kwargs) -> str: + return "/ivr.json" + + +class IVRMenus(IVRs): + """IVR Menus + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/ivrs#list-ivrs + """ + + name = "ivr_menus" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ Simply parse json and iterates over root object""" + ivrs = super().parse_response(response=response, **kwargs) + for ivr in ivrs: + for menu in ivr["menus"]: + yield {"ivr_id": ivr["id"], **menu} + + +class IVRRoutes(IVRs): + """IVR Routes + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/ivr_routes#list-ivr-routes + """ + + name = "ivr_routes" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ Simply parse json and iterates over root object""" + ivrs = super().parse_response(response=response, **kwargs) + for ivr in ivrs: + for menu in ivr["menus"]: + for route in ivr["menus"]: + yield {"ivr_id": ivr["id"], "ivr_menu_id": menu["id"], **route} + + +class AccountOverview(ZendeskTalkSingleRecordStream): + """Account Overview + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/stats#show-account-overview + """ + + data_field = "account_overview" + + def path(self, **kwargs) -> str: + return "/stats/account_overview" + + +class AgentsActivity(ZendeskTalkStream): + """Agents Activity + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/stats#list-agents-activity + """ + + data_field = "agents_activity" + primary_key = "agent_id" + + def path(self, **kwargs) -> str: + return "/stats/agents_activity" + + +class AgentsOverview(ZendeskTalkSingleRecordStream): + """Agents Overview + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/stats#show-agents-overview + """ + + data_field = "agents_overview" + + def path(self, **kwargs) -> str: + return "/stats/agents_overview" + + +class CurrentQueueActivity(ZendeskTalkSingleRecordStream): + """Current Queue Activity + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/stats#show-current-queue-activity + """ + + data_field = "current_queue_activity" + + def path(self, **kwargs) -> str: + return "/stats/current_queue_activity" + + +class Calls(ZendeskTalkIncrementalStream): + """Calls + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/incremental_exports#incremental-calls-export + """ + + data_field = "calls" + cursor_field = "updated_at" + + def path(self, **kwargs) -> str: + return "/stats/incremental/calls" + + +class CallLegs(ZendeskTalkIncrementalStream): + """Call Legs + Docs: https://developer.zendesk.com/rest_api/docs/voice-api/incremental_exports#incremental-call-legs-export + """ + + data_field = "legs" + cursor_field = "updated_at" + + def path(self, **kwargs) -> str: + return "/stats/incremental/legs" + diff --git a/airbyte-integrations/connectors/source-zendesk-talk/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-zendesk-talk/unit_tests/test_streams.py new file mode 100644 index 000000000000..f4b22264008b --- /dev/null +++ b/airbyte-integrations/connectors/source-zendesk-talk/unit_tests/test_streams.py @@ -0,0 +1,235 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +from urllib.parse import urlparse + +import pendulum +import pytest +import requests +from source_zendesk_talk.streams import ZendeskTalkIncrementalStream, ZendeskTalkSingleRecordStream, ZendeskTalkStream + + +class NonIncrementalStream(ZendeskTalkStream): + data_field = "results" + + def path(self, **kwargs) -> str: + return "/test_path" + + +class IncrementalStream(ZendeskTalkIncrementalStream): + data_field = None + cursor_field = "updated_at" + + def path(self, **kwargs) -> str: + return "/test_path" + + +class SingleRecordStream(ZendeskTalkSingleRecordStream): + data_field = "results" + + def path(self, **kwargs) -> str: + return "/test_path" + + +def is_url(url: str) -> bool: + """Checking if provided string is a correct URL, i.e. good enough for urlparse + https://stackoverflow.com/a/52455972/656671 + """ + try: + result = urlparse(url) + return all([result.scheme, result.netloc]) + except ValueError: + return False + + +@pytest.fixture(name="now") +def now_fixture(mocker): + """Fixture to freeze the time""" + return mocker.patch("source_zendesk_talk.streams.pendulum.now", return_value=pendulum.now()) + + +class TestZendeskTalkStream: + def test_url_base(self, mocker): + stream = NonIncrementalStream(subdomain="mydomain", authenticator=mocker.Mock()) + + assert "mydomain" in stream.url_base + assert is_url(stream.url_base), "should be valid URL" + + def test_backoff_time(self, mocker): + stream = NonIncrementalStream(subdomain="mydomain", authenticator=mocker.Mock()) + response = mocker.Mock(spec=requests.Response) + response.headers = {"Retry-After": 10} + + result = stream.backoff_time(response) + + assert result == 10, "should return value from the header if set" + + def test_backoff_time_without_header(self, mocker): + stream = NonIncrementalStream(subdomain="mydomain", authenticator=mocker.Mock()) + response = mocker.Mock(spec=requests.Response) + response.headers = {} + + result = stream.backoff_time(response) + + assert result is None, "no backoff if the header is not set" + + def test_next_page_token(self, mocker): + stream = NonIncrementalStream(subdomain="mydomain", authenticator=mocker.Mock()) + response = mocker.Mock(spec=requests.Response) + response.json.return_value = {"next_page": "https://some.url.com?param1=20¶m2=value"} + + result = stream.next_page_token(response) + + assert result == {"param1": ["20"], "param2": ["value"]}, "should return all params from the next_url" + + def test_next_page_token_end(self, mocker): + stream = NonIncrementalStream(subdomain="mydomain", authenticator=mocker.Mock()) + response = mocker.Mock(spec=requests.Response) + response.json.return_value = {"next_page": None} + + result = stream.next_page_token(response) + + assert result is None, "last page should return no token" + + def test_request_params(self, mocker): + stream = NonIncrementalStream(subdomain="mydomain", authenticator=mocker.Mock()) + + result = stream.request_params(stream_state={}, next_page_token={"some": "token"}) + + assert result == {"some": "token"} + + def test_parse_response(self, mocker): + stream = NonIncrementalStream(subdomain="mydomain", authenticator=mocker.Mock()) + response = mocker.Mock(spec=requests.Response) + response.json.return_value = {stream.data_field: [{"record1"}, {"record2"}, {"record3"}], "some_other_data": 123} + + result = list(stream.parse_response(response=response)) + + assert result == [{"record1"}, {"record2"}, {"record3"}] + + def test_parse_response_from_root(self, mocker): + stream = NonIncrementalStream(subdomain="mydomain", authenticator=mocker.Mock()) + stream.data_field = None + response = mocker.Mock(spec=requests.Response) + response.json.return_value = [{"record1"}, {"record2"}, {"record3"}] + + result = list(stream.parse_response(response=response)) + + assert result == [{"record1"}, {"record2"}, {"record3"}] + + def test_parse_response_single_object(self, mocker): + stream = NonIncrementalStream(subdomain="mydomain", authenticator=mocker.Mock()) + response = mocker.Mock(spec=requests.Response) + response.json.return_value = {stream.data_field: {"record1"}, "some_other_data": 123} + + result = list(stream.parse_response(response=response)) + + assert result == [{"record1"}] + + +class TestZendeskTalkIncrementalStream: + def test_get_updated_state_first_run(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + current_stream_state = {} + latest_record = {stream.cursor_field: "2020-03-03T01:00:00Z", "some_attr": "value"} + + new_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record) + + assert new_state == {stream.cursor_field: "2020-03-03T01:00:00Z"} + + def test_get_updated_state_desc_order(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + current_stream_state = {stream.cursor_field: "2020-03-03T02:00:00Z"} + latest_record = {stream.cursor_field: "2020-03-03T01:00:00Z", "some_attr": "value"} + + new_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record) + + assert new_state == {stream.cursor_field: "2020-03-03T02:00:00Z"} + + def test_get_updated_state_legacy_cursor(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + current_stream_state = {stream.legacy_cursor_field: "2020-03-03T02:00:00Z"} + latest_record = {stream.cursor_field: "2020-03-03T01:00:00Z", "some_attr": "value"} + + new_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record) + + assert new_state == {stream.cursor_field: "2020-03-03T02:00:00Z"} + + def test_get_updated_state(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + current_stream_state = {stream.cursor_field: "2020-03-03T02:00:00Z"} + latest_record = {stream.cursor_field: "2020-03-03T03:00:00Z", "some_attr": "value"} + + new_state = stream.get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record) + + assert new_state == {stream.cursor_field: "2020-03-03T03:00:00Z"} + + def test_request_params_first_page_without_state(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + + result = stream.request_params(stream_state={}) + assert result == {stream.filter_param: int(start_date.timestamp())}, "should fallback to start_date" + + def test_request_params_first_page_with_state(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + + result = stream.request_params(stream_state={stream.cursor_field: "2020-03-03T03:00:00Z"}) + assert result == {stream.filter_param: int(start_date.timestamp())}, "pick always bigger timestamp" + + def test_request_params_pagination(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + + result = stream.request_params( + stream_state={stream.cursor_field: "2020-03-03T03:00:00Z"}, + next_page_token={stream.filter_param: 12345}, + ) + assert result == {stream.filter_param: 12345}, "page token should always override" + + def test_next_page_token(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + response = mocker.Mock(spec=requests.Response, request=mocker.Mock(spec=requests.Request)) + response.json.return_value = {"next_page": f"https://some.url.com?param1=20&{stream.filter_param}=value1"} + response.request.url = f"https://some.url.com?param1=30&{stream.filter_param}=value2" + + result = stream.next_page_token(response) + assert result == {"param1": ["20"], stream.filter_param: ["value1"]}, "take page token from next_page" + + def test_next_page_token_empty_response(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + response = mocker.Mock(spec=requests.Response, request=mocker.Mock(spec=requests.Request)) + response.json.return_value = {"next_page": None} + response.request.url = f"https://some.url.com?param1=30&{stream.filter_param}=value2" + + result = stream.next_page_token(response) + assert result is None, "stop pagination if next page points to the current" + + def test_next_page_token_last_page(self, mocker): + start_date = pendulum.now() + stream = IncrementalStream(subdomain="mydomain", authenticator=mocker.Mock(), start_date=start_date) + response = mocker.Mock(spec=requests.Response, request=mocker.Mock(spec=requests.Request)) + response.json.return_value = {"next_page": f"https://some.url.com?param1=20&{stream.filter_param}=value"} + response.request.url = f"https://some.url.com?param1=30&{stream.filter_param}=value" + + result = stream.next_page_token(response) + assert result is None, "stop pagination if next page points to the current" + + +class TestSingleRecordZendeskTalkStream: + def test_parse_response(self, mocker, now): + stream = SingleRecordStream(subdomain="mydomain", authenticator=mocker.Mock()) + response = mocker.Mock(spec=requests.Response) + response.json.return_value = {stream.data_field: {"field1": "value", "field2": 3}, "some_other_data": 123} + + result = list(stream.parse_response(response=response)) + + assert result == [{"field1": "value", "field2": 3, stream.primary_key: int(now().timestamp())}] diff --git a/airbyte-integrations/connectors/source-zendesk-talk/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-talk/unit_tests/unit_test.py deleted file mode 100644 index c3de36e07d47..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-talk/unit_tests/unit_test.py +++ /dev/null @@ -1,44 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - - -import pytest -from source_zendesk_talk.client import Client - - -@pytest.fixture(name="zendesk_credentials") -def zendesk_credentials_fixture(): - return { - "email": "fake-email@email.cm", - "access_token": "fake_access_token", - "subdomain": "wrong_subdomain", - "start_date": "2021-02-12T00:00:00Z", - } - - -def test_client_with_wrong_credentials(zendesk_credentials): - """Test check with wrong credentials""" - client = Client(**zendesk_credentials) - - alive, error = client.health_check() - - assert not alive - assert error - - -def test_client_backoff_on_limit_reached(requests_mock, zendesk_credentials): - """Error twice, check that we retry and not fail""" - responses = [ - {"json": {"error": "limit reached"}, "status_code": 429}, - {"json": {"error": "limit reached"}, "status_code": 429}, - {"json": {"phone_numbers": [], "count": 0}, "status_code": 200}, - ] - - requests_mock.get(f"https://{zendesk_credentials['subdomain']}.zendesk.com/api/v2/channels/voice/phone_numbers", responses) - client = Client(**zendesk_credentials) - - alive, error = client.health_check() - - assert alive - assert not error diff --git a/docs/integrations/sources/zendesk-talk.md b/docs/integrations/sources/zendesk-talk.md index 979198eacf5e..f8b4a3d0db4a 100644 --- a/docs/integrations/sources/zendesk-talk.md +++ b/docs/integrations/sources/zendesk-talk.md @@ -61,3 +61,8 @@ Generate a API access token as described in [Zendesk docs](https://support.zende We recommend creating a restricted, read-only key specifically for Airbyte access. This will allow you to control which resources Airbyte should be able to access. +### CHANGELOG + +| Version | Date | Pull Request | Subject | +| :------ | :-------- | :----- | :------ | +| `0.1.3` | 2021-11-11 | [7173](https://github.com/airbytehq/airbyte/pull/7173) | Fix pagination and migrate to CDK |