Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Intercom: Fix conversations incremental pagination slowness #11208

Merged
merged 6 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@
- name: Intercom
sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerRepository: airbyte/source-intercom
dockerImageTag: 0.1.13
dockerImageTag: 0.1.14
documentationUrl: https://docs.airbyte.io/integrations/sources/intercom
icon: intercom.svg
sourceType: api
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-intercom/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ COPY source_intercom ./source_intercom
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.name=airbyte/source-intercom
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"stream": "conversations", "data": {"type": "conversation", "id": "1", "created_at": 1607553243, "updated_at": 1626346673, "waiting_since": null, "snoozed_until": null, "source": {"type": "conversation", "id": "701718739", "delivered_as": "customer_initiated", "subject": "", "body": "<p>hey there</p>", "author": {"type": "lead", "id": "5fd150d50697b6d0bbc4a2c2", "name": null, "email": ""}, "attachments": [], "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d", "redacted": false}, "contacts": {"type": "contact.list", "contacts": [{"type": "contact", "id": "5fd150d50697b6d0bbc4a2c2"}]}, "first_contact_reply": {"created_at": 1607553243, "type": "conversation", "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d"}, "admin_assignee_id": null, "team_assignee_id": null, "open": true, "state": "open", "read": false, "tags": {"type": "tag.list", "tags": []}, "priority": "not_priority", "sla_applied": null, "statistics": {"type": "conversation_statistics", "time_to_assignment": null, "time_to_admin_reply": 4317957, "time_to_first_close": null, "time_to_last_close": null, "median_time_to_reply": 4317954, "first_contact_reply_at": 1607553243, "first_assignment_at": null, "first_admin_reply_at": 1625654131, "first_close_at": null, "last_assignment_at": null, "last_assignment_admin_reply_at": null, "last_contact_reply_at": 1607553246, "last_admin_reply_at": 1625656000, "last_close_at": null, "last_closed_by_id": null, "count_reopens": 0, "count_assignments": 0, "count_conversation_parts": 7}, "conversation_rating": null, "teammates": {"type": "admin.list", "admins": [{"type": "admin", "id": "4423433"}]}, "title": null}, "emitted_at": 1638877461000}
{"stream": "conversations", "data": {"type": "conversation", "id": "2", "created_at": 1625749234, "updated_at": 1632835061, "waiting_since": null, "snoozed_until": null, "source": {"type": "conversation", "id": "906873821", "delivered_as": "admin_initiated", "subject": "", "body": "<p>Hi Jean,</p>", "author": {"type": "admin", "id": "4423433", "name": "John Lafleur", "email": "integration-test@airbyte.io"}, "attachments": [], "url": null, "redacted": false}, "contacts": {"type": "contact.list", "contacts": [{"type": "contact", "id": "60e6f6e020ae45ce1ac86f26"}]}, "first_contact_reply": null, "admin_assignee_id": 4423433, "team_assignee_id": null, "open": false, "state": "closed", "read": true, "tags": {"type": "tag.list", "tags": []}, "priority": "not_priority", "sla_applied": null, "statistics": {"type": "conversation_statistics", "time_to_assignment": null, "time_to_admin_reply": null, "time_to_first_close": null, "time_to_last_close": null, "median_time_to_reply": null, "first_contact_reply_at": null, "first_assignment_at": null, "first_admin_reply_at": null, "first_close_at": null, "last_assignment_at": null, "last_assignment_admin_reply_at": null, "last_contact_reply_at": null, "last_admin_reply_at": null, "last_close_at": null, "last_closed_by_id": null, "count_reopens": 0, "count_assignments": 0, "count_conversation_parts": 7}, "conversation_rating": null, "teammates": {"type": "admin.list", "admins": []}, "title": null}, "emitted_at": 1638877461000}
{"stream": "conversations", "data": {"type": "conversation", "id": "1", "created_at": 1607553243, "updated_at": 1626346673, "waiting_since": null, "snoozed_until": null, "source": {"type": "conversation", "id": "701718739", "delivered_as": "customer_initiated", "subject": "", "body": "<p>hey there</p>", "author": {"type": "lead", "id": "5fd150d50697b6d0bbc4a2c2", "name": null, "email": ""}, "attachments": [], "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d", "redacted": false}, "contacts": {"type": "contact.list", "contacts": [{"type": "contact", "id": "5fd150d50697b6d0bbc4a2c2"}]}, "first_contact_reply": {"created_at": 1607553243, "type": "conversation", "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d"}, "admin_assignee_id": null, "team_assignee_id": null, "open": true, "state": "open", "read": false, "tags": {"type": "tag.list", "tags": []}, "priority": "not_priority", "sla_applied": null, "statistics": {"type": "conversation_statistics", "time_to_assignment": null, "time_to_admin_reply": 4317957, "time_to_first_close": null, "time_to_last_close": null, "median_time_to_reply": 4317954, "first_contact_reply_at": 1607553243, "first_assignment_at": null, "first_admin_reply_at": 1625654131, "first_close_at": null, "last_assignment_at": null, "last_assignment_admin_reply_at": null, "last_contact_reply_at": 1607553246, "last_admin_reply_at": 1625656000, "last_close_at": null, "last_closed_by_id": null, "count_reopens": 0, "count_assignments": 0, "count_conversation_parts": 7}, "conversation_rating": null, "teammates": {"type": "admin.list", "admins": [{"type": "admin", "id": "4423433"}]}, "title": null}, "emitted_at": 1638877461000}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_companies_scroll(stream_attributes):
# read all records
records = []
for slice in stream2.stream_slices(sync_mode=SyncMode.full_refresh):
records += list(stream2.read_records(sync_mode=SyncMode, stream_slice=slice))
records += list(stream2.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice))
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
assert len(records) == 3
assert (time.time() - start_time) > 60.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
class IncrementalIntercomStream(IntercomStream, ABC):
cursor_field = "updated_at"

def __init__(self, authenticator: AuthBase, start_date: str = None, **kwargs):
super().__init__(authenticator, start_date, **kwargs)
self.has_old_records = False

def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mapping[str, Any] = None) -> Iterable:
"""
Endpoint does not provide query filtering params, but they provide us
Expand All @@ -101,6 +105,8 @@ def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mappin

if not stream_state or record[self.cursor_field] > stream_state.get(self.cursor_field):
yield record
else:
self.has_old_records = True

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
record = super().parse_response(response, stream_state, **kwargs)
Expand Down Expand Up @@ -280,10 +286,12 @@ class Conversations(IncrementalIntercomStream):

data_fields = ["conversations"]

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(next_page_token, **kwargs)
params.update({"order": "asc", "sort": self.cursor_field})
return params
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guessed you removed this because default ordering is {"order": "desc", "sort": "updated_at"}. I think you should rather explicitly set params.update({"order": "desc", "sort": self.cursor_field}) for safety in case of API changes and to improve the understanding of the code.

# We're sorting by desc. Once we hit the first page with an out-of-date result we can stop.
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self.has_old_records:
return None

return super().next_page_token(response)
alafanechere marked this conversation as resolved.
Show resolved Hide resolved

def path(self, **kwargs) -> str:
return "conversations"
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/intercom.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Please read [How to get your Access Token](https://developers.intercom.com/build

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.14 | 2022-03-16 | [11208](https://github.com/airbytehq/airbyte/pull/11208) | Improve 'conversations' incremental sync speed |
| 0.1.13 | 2022-01-14 | [9513](https://github.com/airbytehq/airbyte/pull/9513) | Added handling of scroll param when it expired |
| 0.1.12 | 2021-12-14 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Updated fields and descriptions |
| 0.1.11 | 2021-12-13 | [8685](https://github.com/airbytehq/airbyte/pull/8685) | Remove time.sleep for rate limit |
Expand Down