Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Zendesk chat: fix chats stream is only pulling for first page #7210

Merged
merged 5 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "40d24d0f-b8f9-4fe0-9e6c-b06c0f3f45e4",
"name": "Zendesk Chat",
"dockerRepository": "airbyte/source-zendesk-chat",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/zendesk-chat",
"icon": "zendesk.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@
- name: Zendesk Chat
sourceDefinitionId: 40d24d0f-b8f9-4fe0-9e6c-b06c0f3f45e4
dockerRepository: airbyte/source-zendesk-chat
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-chat
icon: zendesk.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
*
!Dockerfile
!Dockerfile.test
!main_dev.py
!source_zendesk_chat
!setup.py
!secrets
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
FROM airbyte/integration-base-python:0.1.6
FROM python:3.7-slim

# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

ENV CODE_PATH="source_zendesk_chat"
ENV AIRBYTE_IMPL_MODULE="source_zendesk_chat"
ENV AIRBYTE_IMPL_PATH="SourceZendeskChat"
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main_dev.py"

WORKDIR /airbyte/integration_code
COPY $CODE_PATH ./$CODE_PATH
COPY main_dev.py ./
COPY setup.py ./
RUN pip install .

ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh"
ENTRYPOINT ["python", "/airbyte/integration_code/main_dev.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-zendesk-chat
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ tests:
configured_catalog_path: "sample_files/configured_catalog.json"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalog_incremental.json"
# Unable to use 'state_path' because Zendesk Chat API returns an error when specifying a date in the future.
# future_state_path: "integration_tests/abnormal_state.json"
cursor_paths:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"streams": [
{
"stream": {
"name": "chats",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"streams": [
{
"stream": {
"name": "agents",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["id"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["id"]
},
{
"stream": {
"name": "agent_timeline",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["start_time"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["start_time"]
},
{
"stream": {
"name": "bans",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["id"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["id"]
}
]
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
{
"credentials": {
"access_token": "wrongkey-access-token",
"start_date": "2020-12-12T00:00:00Z"
}
"access_token": "wrongkey-access-token",
"start_date": "2020-12-12T00:00:00Z"
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import sys

from base_python.entrypoint import launch
from airbyte_cdk.entrypoint import launch
from source_zendesk_chat import SourceZendeskChat

if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,5 @@
{
"streams": [
{
"stream": {
"name": "agents",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["id"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["id"]
},
{
"stream": {
"name": "agent_timeline",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["start_time"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["start_time"]
},
{
"stream": {
"name": "accounts",
Expand Down Expand Up @@ -60,18 +36,6 @@
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "bans",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["id"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["id"]
},
{
"stream": {
"name": "departments",
Expand Down Expand Up @@ -107,15 +71,6 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "routing_settings",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"access_token": "<your_access_token>",
"start_date": "2020-11-01T00:00:00"
"start_date": "2020-11-01T00:00:00Z"
}
5 changes: 2 additions & 3 deletions airbyte-integrations/connectors/source-zendesk-chat/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-protocol",
"base-python",
"airbyte-cdk~=0.1",
"pendulum==1.2.0",
"requests==2.25.1",
]

TEST_REQUIREMENTS = ["pytest==6.1.2", "source-acceptance-test"]
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock"]

setup(
name="source_zendesk_chat",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,26 @@

from typing import Any, List, Mapping, Tuple

from airbyte_protocol import SyncMode
from base_python import AbstractSource, Stream, TokenAuthenticator
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

from .api import Accounts, Agents, AgentTimelines, Bans, Chats, Departments, Goals, Roles, RoutingSettings, Shortcuts, Skills, Triggers
from .streams import Accounts, Agents, AgentTimelines, Bans, Chats, Departments, Goals, Roles, RoutingSettings, Shortcuts, Skills, Triggers


class SourceZendeskChat(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
try:
authenticator = TokenAuthenticator(token=config["access_token"])
list(RoutingSettings(authenticator=authenticator).read_records(SyncMode.full_refresh))
authenticator = TokenAuthenticator(config["access_token"])
records = RoutingSettings(authenticator=authenticator).read_records(sync_mode=SyncMode.full_refresh)
next(records)
return True, None
except Exception as error:
return False, f"Unable to connect to Zendesk Chat API with the provided credentials - {error}"

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(token=config["access_token"])
authenticator = TokenAuthenticator(config["access_token"])
return [
Agents(authenticator=authenticator),
AgentTimelines(authenticator=authenticator, start_date=config["start_date"]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@

from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib.parse import parse_qs, urlparse

import pendulum
import requests
from base_python import HttpStream
from airbyte_cdk.sources.streams.http import HttpStream


class Stream(HttpStream):
class Stream(HttpStream, ABC):
url_base = "https://www.zopim.com/api/v2/"
primary_key = "id"

data_field = None

limit = 100

def backoff_time(self, response: requests.Response) -> Optional[float]:
Expand All @@ -26,7 +29,12 @@ def path(self, **kwargs) -> str:
return self.name

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return {}
response_data = response.json()

if "next_url" in response_data:
next_url = response_data["next_url"]
cursor = parse_qs(urlparse(next_url).query)["cursor"]
return {"cursor": cursor}

def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
Expand All @@ -48,12 +56,15 @@ def get_stream_data(self, response_data: Any) -> List[dict]:
response_data = response_data.get(self.data_field, [])

if isinstance(response_data, list):
return response_data
return list(map(self.parse_response_obj, response_data))
elif isinstance(response_data, dict):
return [response_data]
return [self.parse_response_obj(response_data)]
else:
raise Exception(f"Unsupported type of response data for stream {self.name}")

def parse_response_obj(self, response_obj: dict) -> dict:
return response_obj


class BaseIncrementalStream(Stream, ABC):
@property
Expand Down Expand Up @@ -95,8 +106,9 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
latest_benchmark = self._field_to_datetime(latest_record[self.cursor_field])
if current_stream_state.get(self.cursor_field):
return {self.cursor_field: str(max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field])))}
return {self.cursor_field: str(latest_benchmark)}
state = max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field]))
return {self.cursor_field: state.strftime("%Y-%m-%dT%H:%M:%SZ")}
return {self.cursor_field: latest_benchmark.strftime("%Y-%m-%dT%H:%M:%SZ")}

def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
Expand All @@ -117,6 +129,10 @@ def request_params(
def path(self, **kwargs) -> str:
return f"incremental/{self.name}"

def parse_response_obj(self, response_obj: dict) -> dict:
response_obj[self.cursor_field] = pendulum.parse(response_obj[self.cursor_field]).strftime("%Y-%m-%dT%H:%M:%SZ")
return response_obj


class IdIncrementalStream(BaseIncrementalStream):
cursor_field = "id"
Expand Down Expand Up @@ -167,12 +183,28 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]:
params["start_time"] = params["start_time"] * 1000000
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()
stream_data = self.get_stream_data(response_data)

def generate_key(record):
record.update({"id": "|".join((str(record.get("agent_id", "")), str(record.get("start_time", ""))))})
return record

# associate the surrogate key
yield from map(
generate_key,
stream_data,
)


class Accounts(Stream):
"""
Accounts Stream: https://developer.zendesk.com/rest_api/docs/chat/accounts#show-account
"""

primary_key = "account_key"

def path(self, **kwargs) -> str:
return "account"

Expand Down Expand Up @@ -237,8 +269,15 @@ class RoutingSettings(Stream):
Routing Settings Stream: https://developer.zendesk.com/rest_api/docs/chat/routing_settings#show-account-routing-settings
"""

primary_key = ""

name = "routing_settings"
data_field = "data"

def path(self, **kwargs) -> str:
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
return "routing_settings/account"
1 change: 1 addition & 0 deletions docs/integrations/sources/zendesk-chat.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ We recommend creating a restricted, read-only key specifically for Airbyte acces

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.3 | 2021-10-21 | [7210](https://github.com/airbytehq/airbyte/pull/7210) | Chats stream is only getting data from first page |
| 0.1.2 | 2021-08-17 | [5476](https://github.com/airbytehq/airbyte/pull/5476) | Correct field unread to boolean type |
| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |
| 0.1.0 | 2021-05-03 | [3088](https://github.com/airbytehq/airbyte/pull/3088) | Initial release |
Expand Down