Skip to content

Commit

Permalink
🐛 Source Intercom: Fix conversations incremental pagination slowness (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bkrausz authored Mar 21, 2022
1 parent 94a862b commit ea92a24
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@
- name: Intercom
sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerRepository: airbyte/source-intercom
dockerImageTag: 0.1.13
dockerImageTag: 0.1.14
documentationUrl: https://docs.airbyte.io/integrations/sources/intercom
icon: intercom.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3588,7 +3588,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-intercom:0.1.13"
- dockerImage: "airbyte/source-intercom:0.1.14"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/intercom"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-intercom/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ COPY source_intercom ./source_intercom
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.name=airbyte/source-intercom
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"stream": "conversations", "data": {"type": "conversation", "id": "1", "created_at": 1607553243, "updated_at": 1626346673, "waiting_since": null, "snoozed_until": null, "source": {"type": "conversation", "id": "701718739", "delivered_as": "customer_initiated", "subject": "", "body": "<p>hey there</p>", "author": {"type": "lead", "id": "5fd150d50697b6d0bbc4a2c2", "name": null, "email": ""}, "attachments": [], "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d", "redacted": false}, "contacts": {"type": "contact.list", "contacts": [{"type": "contact", "id": "5fd150d50697b6d0bbc4a2c2"}]}, "first_contact_reply": {"created_at": 1607553243, "type": "conversation", "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d"}, "admin_assignee_id": null, "team_assignee_id": null, "open": true, "state": "open", "read": false, "tags": {"type": "tag.list", "tags": []}, "priority": "not_priority", "sla_applied": null, "statistics": {"type": "conversation_statistics", "time_to_assignment": null, "time_to_admin_reply": 4317957, "time_to_first_close": null, "time_to_last_close": null, "median_time_to_reply": 4317954, "first_contact_reply_at": 1607553243, "first_assignment_at": null, "first_admin_reply_at": 1625654131, "first_close_at": null, "last_assignment_at": null, "last_assignment_admin_reply_at": null, "last_contact_reply_at": 1607553246, "last_admin_reply_at": 1625656000, "last_close_at": null, "last_closed_by_id": null, "count_reopens": 0, "count_assignments": 0, "count_conversation_parts": 7}, "conversation_rating": null, "teammates": {"type": "admin.list", "admins": [{"type": "admin", "id": "4423433"}]}, "title": null}, "emitted_at": 1638877461000}
{"stream": "conversations", "data": {"type": "conversation", "id": "2", "created_at": 1625749234, "updated_at": 1632835061, "waiting_since": null, "snoozed_until": null, "source": {"type": "conversation", "id": "906873821", "delivered_as": "admin_initiated", "subject": "", "body": "<p>Hi Jean,</p>", "author": {"type": "admin", "id": "4423433", "name": "John Lafleur", "email": "integration-test@airbyte.io"}, "attachments": [], "url": null, "redacted": false}, "contacts": {"type": "contact.list", "contacts": [{"type": "contact", "id": "60e6f6e020ae45ce1ac86f26"}]}, "first_contact_reply": null, "admin_assignee_id": 4423433, "team_assignee_id": null, "open": false, "state": "closed", "read": true, "tags": {"type": "tag.list", "tags": []}, "priority": "not_priority", "sla_applied": null, "statistics": {"type": "conversation_statistics", "time_to_assignment": null, "time_to_admin_reply": null, "time_to_first_close": null, "time_to_last_close": null, "median_time_to_reply": null, "first_contact_reply_at": null, "first_assignment_at": null, "first_admin_reply_at": null, "first_close_at": null, "last_assignment_at": null, "last_assignment_admin_reply_at": null, "last_contact_reply_at": null, "last_admin_reply_at": null, "last_close_at": null, "last_closed_by_id": null, "count_reopens": 0, "count_assignments": 0, "count_conversation_parts": 7}, "conversation_rating": null, "teammates": {"type": "admin.list", "admins": []}, "title": null}, "emitted_at": 1638877461000}
{"stream": "conversations", "data": {"type": "conversation", "id": "1", "created_at": 1607553243, "updated_at": 1626346673, "waiting_since": null, "snoozed_until": null, "source": {"type": "conversation", "id": "701718739", "delivered_as": "customer_initiated", "subject": "", "body": "<p>hey there</p>", "author": {"type": "lead", "id": "5fd150d50697b6d0bbc4a2c2", "name": null, "email": ""}, "attachments": [], "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d", "redacted": false}, "contacts": {"type": "contact.list", "contacts": [{"type": "contact", "id": "5fd150d50697b6d0bbc4a2c2"}]}, "first_contact_reply": {"created_at": 1607553243, "type": "conversation", "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d"}, "admin_assignee_id": null, "team_assignee_id": null, "open": true, "state": "open", "read": false, "tags": {"type": "tag.list", "tags": []}, "priority": "not_priority", "sla_applied": null, "statistics": {"type": "conversation_statistics", "time_to_assignment": null, "time_to_admin_reply": 4317957, "time_to_first_close": null, "time_to_last_close": null, "median_time_to_reply": 4317954, "first_contact_reply_at": 1607553243, "first_assignment_at": null, "first_admin_reply_at": 1625654131, "first_close_at": null, "last_assignment_at": null, "last_assignment_admin_reply_at": null, "last_contact_reply_at": 1607553246, "last_admin_reply_at": 1625656000, "last_close_at": null, "last_closed_by_id": null, "count_reopens": 0, "count_assignments": 0, "count_conversation_parts": 7}, "conversation_rating": null, "teammates": {"type": "admin.list", "admins": [{"type": "admin", "id": "4423433"}]}, "title": null}, "emitted_at": 1638877461000}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_companies_scroll(stream_attributes):
# read all records
records = []
for slice in stream2.stream_slices(sync_mode=SyncMode.full_refresh):
records += list(stream2.read_records(sync_mode=SyncMode, stream_slice=slice))
records += list(stream2.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice))
assert len(records) == 3
assert (time.time() - start_time) > 60.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
class IncrementalIntercomStream(IntercomStream, ABC):
cursor_field = "updated_at"

def __init__(self, authenticator: AuthBase, start_date: str = None, **kwargs):
super().__init__(authenticator, start_date, **kwargs)
self.has_old_records = False

def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mapping[str, Any] = None) -> Iterable:
"""
Endpoint does not provide query filtering params, but they provide us
Expand All @@ -101,6 +105,8 @@ def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mappin

if not stream_state or record[self.cursor_field] > stream_state.get(self.cursor_field):
yield record
else:
self.has_old_records = True

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
record = super().parse_response(response, stream_state, **kwargs)
Expand Down Expand Up @@ -282,9 +288,16 @@ class Conversations(IncrementalIntercomStream):

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(next_page_token, **kwargs)
params.update({"order": "asc", "sort": self.cursor_field})
params.update({"order": "desc", "sort": self.cursor_field})
return params

# We're sorting by desc. Once we hit the first page with an out-of-date result we can stop.
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self.has_old_records:
return None

return super().next_page_token(response)

def path(self, **kwargs) -> str:
return "conversations"

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/intercom.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Please read [How to get your Access Token](https://developers.intercom.com/build

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.14 | 2022-03-16 | [11208](https://github.com/airbytehq/airbyte/pull/11208) | Improve 'conversations' incremental sync speed |
| 0.1.13 | 2022-01-14 | [9513](https://github.com/airbytehq/airbyte/pull/9513) | Added handling of scroll param when it expired |
| 0.1.12 | 2021-12-14 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Updated fields and descriptions |
| 0.1.11 | 2021-12-13 | [8685](https://github.com/airbytehq/airbyte/pull/8685) | Remove time.sleep for rate limit |
Expand Down

0 comments on commit ea92a24

Please sign in to comment.