diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 6ad5de1ce01c..991550fab39c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -356,7 +356,7 @@ - name: Intercom sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a dockerRepository: airbyte/source-intercom - dockerImageTag: 0.1.13 + dockerImageTag: 0.1.14 documentationUrl: https://docs.airbyte.io/integrations/sources/intercom icon: intercom.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 2dcf7000209c..d7a0bce81dce 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3588,7 +3588,7 @@ oauthFlowInitParameters: [] oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-intercom:0.1.13" +- dockerImage: "airbyte/source-intercom:0.1.14" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/intercom" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-intercom/Dockerfile b/airbyte-integrations/connectors/source-intercom/Dockerfile index 2ae375afc8c4..6942fc0122c2 100644 --- a/airbyte-integrations/connectors/source-intercom/Dockerfile +++ b/airbyte-integrations/connectors/source-intercom/Dockerfile @@ -35,5 +35,5 @@ COPY source_intercom ./source_intercom ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.13 +LABEL io.airbyte.version=0.1.14 LABEL io.airbyte.name=airbyte/source-intercom diff --git a/airbyte-integrations/connectors/source-intercom/integration_tests/expected_records.txt b/airbyte-integrations/connectors/source-intercom/integration_tests/expected_records.txt index 5599db1de79e..d7fd268a9c89 100644 --- a/airbyte-integrations/connectors/source-intercom/integration_tests/expected_records.txt +++ b/airbyte-integrations/connectors/source-intercom/integration_tests/expected_records.txt @@ -1,2 +1,2 @@ -{"stream": "conversations", "data": {"type": "conversation", "id": "1", "created_at": 1607553243, "updated_at": 1626346673, "waiting_since": null, "snoozed_until": null, "source": {"type": "conversation", "id": "701718739", "delivered_as": "customer_initiated", "subject": "", "body": "

hey there

", "author": {"type": "lead", "id": "5fd150d50697b6d0bbc4a2c2", "name": null, "email": ""}, "attachments": [], "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d", "redacted": false}, "contacts": {"type": "contact.list", "contacts": [{"type": "contact", "id": "5fd150d50697b6d0bbc4a2c2"}]}, "first_contact_reply": {"created_at": 1607553243, "type": "conversation", "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d"}, "admin_assignee_id": null, "team_assignee_id": null, "open": true, "state": "open", "read": false, "tags": {"type": "tag.list", "tags": []}, "priority": "not_priority", "sla_applied": null, "statistics": {"type": "conversation_statistics", "time_to_assignment": null, "time_to_admin_reply": 4317957, "time_to_first_close": null, "time_to_last_close": null, "median_time_to_reply": 4317954, "first_contact_reply_at": 1607553243, "first_assignment_at": null, "first_admin_reply_at": 1625654131, "first_close_at": null, "last_assignment_at": null, "last_assignment_admin_reply_at": null, "last_contact_reply_at": 1607553246, "last_admin_reply_at": 1625656000, "last_close_at": null, "last_closed_by_id": null, "count_reopens": 0, "count_assignments": 0, "count_conversation_parts": 7}, "conversation_rating": null, "teammates": {"type": "admin.list", "admins": [{"type": "admin", "id": "4423433"}]}, "title": null}, "emitted_at": 1638877461000} {"stream": "conversations", "data": {"type": "conversation", "id": "2", "created_at": 1625749234, "updated_at": 1632835061, "waiting_since": null, "snoozed_until": null, "source": {"type": "conversation", "id": "906873821", "delivered_as": "admin_initiated", "subject": "", "body": "

Hi Jean,

", "author": {"type": "admin", "id": "4423433", "name": "John Lafleur", "email": "integration-test@airbyte.io"}, "attachments": [], "url": null, "redacted": false}, "contacts": {"type": "contact.list", "contacts": [{"type": "contact", "id": "60e6f6e020ae45ce1ac86f26"}]}, "first_contact_reply": null, "admin_assignee_id": 4423433, "team_assignee_id": null, "open": false, "state": "closed", "read": true, "tags": {"type": "tag.list", "tags": []}, "priority": "not_priority", "sla_applied": null, "statistics": {"type": "conversation_statistics", "time_to_assignment": null, "time_to_admin_reply": null, "time_to_first_close": null, "time_to_last_close": null, "median_time_to_reply": null, "first_contact_reply_at": null, "first_assignment_at": null, "first_admin_reply_at": null, "first_close_at": null, "last_assignment_at": null, "last_assignment_admin_reply_at": null, "last_contact_reply_at": null, "last_admin_reply_at": null, "last_close_at": null, "last_closed_by_id": null, "count_reopens": 0, "count_assignments": 0, "count_conversation_parts": 7}, "conversation_rating": null, "teammates": {"type": "admin.list", "admins": []}, "title": null}, "emitted_at": 1638877461000} +{"stream": "conversations", "data": {"type": "conversation", "id": "1", "created_at": 1607553243, "updated_at": 1626346673, "waiting_since": null, "snoozed_until": null, "source": {"type": "conversation", "id": "701718739", "delivered_as": "customer_initiated", "subject": "", "body": "

hey there

", "author": {"type": "lead", "id": "5fd150d50697b6d0bbc4a2c2", "name": null, "email": ""}, "attachments": [], "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d", "redacted": false}, "contacts": {"type": "contact.list", "contacts": [{"type": "contact", "id": "5fd150d50697b6d0bbc4a2c2"}]}, "first_contact_reply": {"created_at": 1607553243, "type": "conversation", "url": "http://localhost:63342/airbyte-python/airbyte-integrations/bases/base-java/build/tmp/expandedArchives/org.jacoco.agent-0.8.5.jar_6a2df60c47de373ea127d14406367999/about.html?_ijt=uosck1k6vmp2dnl4oqib2g3u9d"}, "admin_assignee_id": null, "team_assignee_id": null, "open": true, "state": "open", "read": false, "tags": {"type": "tag.list", "tags": []}, "priority": "not_priority", "sla_applied": null, "statistics": {"type": "conversation_statistics", "time_to_assignment": null, "time_to_admin_reply": 4317957, "time_to_first_close": null, "time_to_last_close": null, "median_time_to_reply": 4317954, "first_contact_reply_at": 1607553243, "first_assignment_at": null, "first_admin_reply_at": 1625654131, "first_close_at": null, "last_assignment_at": null, "last_assignment_admin_reply_at": null, "last_contact_reply_at": 1607553246, "last_admin_reply_at": 1625656000, "last_close_at": null, "last_closed_by_id": null, "count_reopens": 0, "count_assignments": 0, "count_conversation_parts": 7}, "conversation_rating": null, "teammates": {"type": "admin.list", "admins": [{"type": "admin", "id": "4423433"}]}, "title": null}, "emitted_at": 1638877461000} diff --git a/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py index 1bf0f2c761fb..9c7ad78e62a7 100644 --- a/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py @@ -93,7 +93,7 @@ def test_companies_scroll(stream_attributes): # read all records records = [] for slice in stream2.stream_slices(sync_mode=SyncMode.full_refresh): - records += list(stream2.read_records(sync_mode=SyncMode, stream_slice=slice)) + records += list(stream2.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice)) assert len(records) == 3 assert (time.time() - start_time) > 60.0 diff --git a/airbyte-integrations/connectors/source-intercom/source_intercom/source.py b/airbyte-integrations/connectors/source-intercom/source_intercom/source.py index 8f99e9f473a5..f36c76c32dd4 100755 --- a/airbyte-integrations/connectors/source-intercom/source_intercom/source.py +++ b/airbyte-integrations/connectors/source-intercom/source_intercom/source.py @@ -92,6 +92,10 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, class IncrementalIntercomStream(IntercomStream, ABC): cursor_field = "updated_at" + def __init__(self, authenticator: AuthBase, start_date: str = None, **kwargs): + super().__init__(authenticator, start_date, **kwargs) + self.has_old_records = False + def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mapping[str, Any] = None) -> Iterable: """ Endpoint does not provide query filtering params, but they provide us @@ -101,6 +105,8 @@ def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mappin if not stream_state or record[self.cursor_field] > stream_state.get(self.cursor_field): yield record + else: + self.has_old_records = True def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: record = super().parse_response(response, stream_state, **kwargs) @@ -282,9 +288,16 @@ class Conversations(IncrementalIntercomStream): def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: params = super().request_params(next_page_token, **kwargs) - params.update({"order": "asc", "sort": self.cursor_field}) + params.update({"order": "desc", "sort": self.cursor_field}) return params + # We're sorting by desc. Once we hit the first page with an out-of-date result we can stop. + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + if self.has_old_records: + return None + + return super().next_page_token(response) + def path(self, **kwargs) -> str: return "conversations" diff --git a/docs/integrations/sources/intercom.md b/docs/integrations/sources/intercom.md index 537dba6b0a3f..61bbbe8b18ca 100644 --- a/docs/integrations/sources/intercom.md +++ b/docs/integrations/sources/intercom.md @@ -55,6 +55,7 @@ Please read [How to get your Access Token](https://developers.intercom.com/build | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.14 | 2022-03-16 | [11208](https://github.com/airbytehq/airbyte/pull/11208) | Improve 'conversations' incremental sync speed | | 0.1.13 | 2022-01-14 | [9513](https://github.com/airbytehq/airbyte/pull/9513) | Added handling of scroll param when it expired | | 0.1.12 | 2021-12-14 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Updated fields and descriptions | | 0.1.11 | 2021-12-13 | [8685](https://github.com/airbytehq/airbyte/pull/8685) | Remove time.sleep for rate limit |