diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/36c891d9-4bd9-43ac-bad2-10e12756272c.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/36c891d9-4bd9-43ac-bad2-10e12756272c.json index b307efc88425..28595536e48b 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/36c891d9-4bd9-43ac-bad2-10e12756272c.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/36c891d9-4bd9-43ac-bad2-10e12756272c.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "36c891d9-4bd9-43ac-bad2-10e12756272c", "name": "HubSpot", "dockerRepository": "airbyte/source-hubspot", - "dockerImageTag": "0.1.32", + "dockerImageTag": "0.1.33", "documentationUrl": "https://docs.airbyte.io/integrations/sources/hubspot", "icon": "hubspot.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 452238a59e0a..9347a5dc1119 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -293,7 +293,7 @@ - name: HubSpot sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c dockerRepository: airbyte/source-hubspot - dockerImageTag: 0.1.32 + dockerImageTag: 0.1.33 documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot icon: hubspot.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 294e4080a640..723f59f8fdd6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2872,7 +2872,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-hubspot:0.1.32" +- dockerImage: "airbyte/source-hubspot:0.1.33" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-hubspot/Dockerfile b/airbyte-integrations/connectors/source-hubspot/Dockerfile index e51182b850dd..2f786fa76129 100644 --- a/airbyte-integrations/connectors/source-hubspot/Dockerfile +++ b/airbyte-integrations/connectors/source-hubspot/Dockerfile @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.32 +LABEL io.airbyte.version=0.1.33 LABEL io.airbyte.name=airbyte/source-hubspot diff --git a/airbyte-integrations/connectors/source-hubspot/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-hubspot/integration_tests/abnormal_state.json index 5944b5d50c7a..5cf26f89b23d 100644 --- a/airbyte-integrations/connectors/source-hubspot/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-hubspot/integration_tests/abnormal_state.json @@ -14,6 +14,9 @@ "email_events": { "timestamp": "2221-10-12T13:37:56.412000+00:00" }, + "engagements": { + "lastUpdated": 7945393076412 + }, "line_items": { "updatedAt": "2221-10-12T13:37:56.412000+00:00" }, diff --git a/airbyte-integrations/connectors/source-hubspot/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-hubspot/sample_files/configured_catalog.json index b6157948c296..c3cbea5bfa4e 100644 --- a/airbyte-integrations/connectors/source-hubspot/sample_files/configured_catalog.json +++ b/airbyte-integrations/connectors/source-hubspot/sample_files/configured_catalog.json @@ -82,10 +82,13 @@ "stream": { "name": "engagements", "json_schema": {}, - "supported_sync_modes": ["full_refresh"] + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["lastUpdated"] }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" + "sync_mode": "incremental", + "cursor_field": ["lastUpdated"], + "destination_sync_mode": "append" }, { "stream": { diff --git a/airbyte-integrations/connectors/source-hubspot/sample_files/sample_state.json b/airbyte-integrations/connectors/source-hubspot/sample_files/sample_state.json index a652b9bb2301..a78d97590de2 100644 --- a/airbyte-integrations/connectors/source-hubspot/sample_files/sample_state.json +++ b/airbyte-integrations/connectors/source-hubspot/sample_files/sample_state.json @@ -14,6 +14,9 @@ "email_events": { "timestamp": "2021-02-23T00:00:00Z" }, + "engagements": { + "lastUpdated": 1614038400000 + }, "line_items": { "updatedAt": "2021-02-23T00:00:00Z" }, diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py index 8dedd91d8200..4f12dbc2a988 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py @@ -93,14 +93,14 @@ def giveup_handler(exc): ) -def retry_after_handler(**kwargs): +def retry_after_handler(fixed_retry_after=None, **kwargs): """Retry helper when we hit the call limit, sleeps for specific duration""" def sleep_on_ratelimit(_details): _, exc, _ = sys.exc_info() if isinstance(exc, HubspotRateLimited): # HubSpot API does not always return Retry-After value for 429 HTTP error - retry_after = int(exc.response.headers.get("Retry-After", 3)) + retry_after = fixed_retry_after if fixed_retry_after else int(exc.response.headers.get("Retry-After", 3)) logger.info(f"Rate limit reached. Sleeping for {retry_after} seconds") time.sleep(retry_after + 1) # extra second to cover any fractions of second @@ -216,7 +216,7 @@ def name(self) -> str: stream_name = stream_name[: -len("Stream")] return stream_name - def list(self, fields) -> Iterable: + def list_records(self, fields) -> Iterable: yield from self.read(partial(self._api.get, url=self.url)) @staticmethod @@ -309,6 +309,27 @@ def _filter_old_records(self, records: Iterable) -> Iterable: continue yield record + def _read_stream_records( + self, getter: Callable, properties_list: List[str], params: MutableMapping[str, Any] = None + ) -> Tuple[dict, Any]: + # TODO: Additional processing was added due to the fact that users receive 414 errors while syncing their streams (issues #3977 and #5835). + # We will need to fix this code when the HubSpot developers add the ability to use a special parameter to get all properties for an entity. + # According to HubSpot Community (https://community.hubspot.com/t5/APIs-Integrations/Get-all-contact-properties-without-explicitly-listing-them/m-p/447950) + # and the official documentation, this does not exist at the moment. + stream_records = {} + response = None + + for properties in split_properties(properties_list): + params.update({"properties": ",".join(properties)}) + response = getter(params=params) + for record in self._transform(self.parse_response(response)): + if record["id"] not in stream_records: + stream_records[record["id"]] = record + elif stream_records[record["id"]].get("properties"): + stream_records[record["id"]]["properties"].update(record.get("properties", {})) + + return stream_records, response + def _read(self, getter: Callable, params: MutableMapping[str, Any] = None) -> Iterator: next_page_token = None while True: @@ -317,21 +338,7 @@ def _read(self, getter: Callable, params: MutableMapping[str, Any] = None) -> It properties_list = list(self.properties.keys()) if properties_list: - # TODO: Additional processing was added due to the fact that users receive 414 errors while syncing their streams (issues #3977 and #5835). - # We will need to fix this code when the HubSpot developers add the ability to use a special parameter to get all properties for an entity. - # According to HubSpot Community (https://community.hubspot.com/t5/APIs-Integrations/Get-all-contact-properties-without-explicitly-listing-them/m-p/447950) - # and the official documentation, this does not exist at the moment. - stream_records = {} - - for properties in split_properties(properties_list): - params.update({"properties": ",".join(properties)}) - response = getter(params=params) - for record in self._transform(self.parse_response(response)): - if record["id"] not in stream_records: - stream_records[record["id"]] = record - elif stream_records[record["id"]].get("properties"): - stream_records[record["id"]]["properties"].update(record.get("properties", {})) - + stream_records, response = self._read_stream_records(getter=getter, params=params, properties_list=properties_list) yield from [value for key, value in stream_records.items()] else: response = getter(params=params) @@ -427,6 +434,26 @@ def properties(self) -> Mapping[str, Any]: return props + def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[MutableMapping]: + """When result has associations we prefer to have it flat, so we transform this: + + "associations": { + "contacts": { + "results": [{"id": "201", "type": "company_to_contact"}, {"id": "251", "type": "company_to_contact"}]} + } + } + + to this: + + "contacts": [201, 251] + """ + for record in records: + if "associations" in record: + associations = record.pop("associations") + for name, association in associations.items(): + record[name] = [row["id"] for row in association.get("results", [])] + yield record + class IncrementalStream(Stream, ABC): """Stream that supports state and incremental read""" @@ -472,6 +499,9 @@ def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: cursor = self._field_to_datetime(record[self.updated_at_field]) latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor + self._update_state(latest_cursor=latest_cursor) + + def _update_state(self, latest_cursor): if latest_cursor: new_state = max(latest_cursor, self._state) if self._state else latest_cursor if new_state != self._state: @@ -498,6 +528,92 @@ def read_chunked( yield from super().read(getter, params) +class CRMSearchStream(IncrementalStream, ABC): + + limit = 100 # This value is used only when state is None. + state_pk = "updatedAt" + updated_at_field = "updatedAt" + + @property + def url(self): + return f"/crm/v3/objects/{self.entity}/search" if self.state else f"/crm/v3/objects/{self.entity}" + + def __init__( + self, + entity: Optional[str] = None, + last_modified_field: Optional[str] = None, + associations: Optional[List[str]] = None, + include_archived_only: bool = False, + **kwargs, + ): + super().__init__(**kwargs) + self._state = None + self.entity = entity + self.last_modified_field = last_modified_field + self.associations = associations + self._include_archived_only = include_archived_only + + @retry_connection_handler(max_tries=5, factor=5) + @retry_after_handler(fixed_retry_after=1, max_tries=3) + def search( + self, url: str, data: Mapping[str, Any], params: MutableMapping[str, Any] = None + ) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]: + # We can safely retry this POST call, because it's a search operation. + # Given Hubspot does not return any Retry-After header (https://developers.hubspot.com/docs/api/crm/search) + # from the search endpoint, it waits one second after trying again. + # As per their docs: `These search endpoints are rate limited to four requests per second per authentication token`. + return self._api.post(url=url, data=data, params=params) + + def list_records(self, fields) -> Iterable: + params = { + "archived": str(self._include_archived_only).lower(), + "associations": self.associations, + } + if self.state: + generator = self.read(partial(self.search, url=self.url), params) + else: + generator = self.read(partial(self._api.get, url=self.url), params) + yield from self._flat_associations(self._filter_old_records(generator)) + + def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: + """Apply state filter to set of records, update cursor(state) if necessary in the end""" + latest_cursor = None + default_params = {"limit": self.limit} + params = {**default_params, **params} if params else {**default_params} + properties_list = list(self.properties.keys()) + + payload = ( + { + "filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}], + "properties": properties_list, + "limit": 100, + } + if self.state + else {} + ) + + while True: + stream_records = {} + if self.state: + response = getter(data=payload) + for record in self._transform(self.parse_response(response)): + stream_records[record["id"]] = record + else: + stream_records, response = self._read_stream_records(getter=getter, params=params, properties_list=properties_list) + + for _, record in stream_records.items(): + yield record + cursor = self._field_to_datetime(record[self.updated_at_field]) + latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor + if "paging" in response and "next" in response["paging"] and "after" in response["paging"]["next"]: + params["after"] = response["paging"]["next"]["after"] + payload["after"] = response["paging"]["next"]["after"] + else: + break + + self._update_state(latest_cursor=latest_cursor) + + class CRMObjectStream(Stream): """Unified stream interface for CRM objects. You need to provide `entity` parameter to read concrete stream, possible values are: @@ -528,7 +644,7 @@ def __init__( if not self.entity: raise ValueError("Entity must be set either on class or instance level") - def list(self, fields) -> Iterable: + def list_records(self, fields) -> Iterable: params = { "archived": str(self._include_archived_only).lower(), "associations": self.associations, @@ -536,26 +652,6 @@ def list(self, fields) -> Iterable: generator = self.read(partial(self._api.get, url=self.url), params) yield from self._flat_associations(generator) - def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[MutableMapping]: - """When result has associations we prefer to have it flat, so we transform this: - - "associations": { - "contacts": { - "results": [{"id": "201", "type": "company_to_contact"}, {"id": "251", "type": "company_to_contact"}]} - } - } - - to this: - - "contacts": [201, 251] - """ - for record in records: - if "associations" in record: - associations = record.pop("associations") - for name, association in associations.items(): - record[name] = [row["id"] for row in association.get("results", [])] - yield record - class CRMObjectIncrementalStream(CRMObjectStream, IncrementalStream): state_pk = "updatedAt" @@ -575,7 +671,7 @@ class CampaignStream(Stream): limit = 500 updated_at_field = "lastUpdatedTime" - def list(self, fields) -> Iterable: + def list_records(self, fields) -> Iterable: for row in self.read(getter=partial(self._api.get, url=self.url)): record = self._api.get(f"/email/public/v1/campaigns/{row['id']}") yield {**row, **record} @@ -623,7 +719,7 @@ def _transform(self, records: Iterable) -> Iterable: for item in record.get("list-memberships", []): yield {"canonical-vid": canonical_vid, **item} - def list(self, fields) -> Iterable: + def list_records(self, fields) -> Iterable: """Receiving all contacts with list memberships""" params = {"showListMemberships": True} yield from self.read(partial(self._api.get, url=self.url), params) @@ -648,24 +744,24 @@ def _transform(self, records: Iterable) -> Iterable: if updated_at: yield {"id": record.get("dealId"), "dealstage": dealstage, self.updated_at_field: updated_at} - def list(self, fields) -> Iterable: + def list_records(self, fields) -> Iterable: params = {"propertiesWithHistory": "dealstage"} yield from self.read(partial(self._api.get, url=self.url), params) -class DealStream(CRMObjectIncrementalStream): +class DealStream(CRMSearchStream): """Deals, API v3""" def __init__(self, **kwargs): - super().__init__(entity="deal", **kwargs) + super().__init__(entity="deal", last_modified_field="hs_lastmodifieddate", **kwargs) self._stage_history = DealStageHistoryStream(**kwargs) - def list(self, fields) -> Iterable: + def list_records(self, fields) -> Iterable: history_by_id = {} - for record in self._stage_history.list(fields): + for record in self._stage_history.list_records(fields): if all(field in record for field in ("id", "dealstage")): history_by_id[record["id"]] = record["dealstage"] - for record in super().list(fields): + for record in super().list_records(fields): if record.get("id") and int(record["id"]) in history_by_id: record["dealstage"] = history_by_id[int(record["id"])] yield record @@ -705,9 +801,10 @@ class EmailEventStream(IncrementalStream): created_at_field = "created" -class EngagementStream(Stream): +class EngagementStream(IncrementalStream): """Engagements, API v1 Docs: https://legacydocs.hubspot.com/docs/methods/engagements/get-all-engagements + https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements """ url = "/engagements/v1/engagements/paged" @@ -715,10 +812,50 @@ class EngagementStream(Stream): limit = 250 updated_at_field = "lastUpdated" created_at_field = "createdAt" + state_pk = "lastUpdated" + + @property + def url(self): + if self.state: + return "/engagements/v1/engagements/recent/modified" + return "/engagements/v1/engagements/paged" + + @property + def state(self) -> Optional[Mapping[str, Any]]: + """Current state, if wasn't set return None""" + return {self.state_pk: self._state} if self._state else None + + @state.setter + def state(self, value): + state = value[self.state_pk] + self._state = state + self._start_date = max(self._field_to_datetime(self._state), self._start_date) def _transform(self, records: Iterable) -> Iterable: yield from super()._transform({**record.pop("engagement"), **record} for record in records) + def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: + max_last_updated_at = None + default_params = {self.limit_field: self.limit} + params = {**default_params, **params} if params else {**default_params} + if self.state: + params["since"] = self._state + count = 0 + for record in self._filter_old_records(self._read(getter, params)): + yield record + count += 1 + cursor = record[self.updated_at_field] + max_last_updated_at = max(cursor, max_last_updated_at) if max_last_updated_at else cursor + + logger.info(f"Processed {count} records") + + if max_last_updated_at: + new_state = max(max_last_updated_at, self._state) if self._state else max_last_updated_at + if new_state != self._state: + logger.info(f"Advancing bookmark for engagement stream from {self._state} to {max_last_updated_at}") + self._state = new_state + self._start_date = self._state + class FormStream(Stream): """Marketing Forms, API v3 @@ -753,7 +890,7 @@ def _transform(self, records: Iterable) -> Iterable: yield record - def list(self, fields) -> Iterable: + def list_records(self, fields) -> Iterable: for form in self.read(getter=partial(self._api.get, url="/marketing/v3/forms")): for submission in self.read(getter=partial(self._api.get, url=f"{self.url}/{form['id']}")): submission["formId"] = form["id"] diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py index 7768755fc387..80e97632e26e 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py @@ -14,6 +14,7 @@ ContactListStream, ContactsListMembershipsStream, CRMObjectIncrementalStream, + CRMSearchStream, DealPipelineStream, DealStream, EmailEventStream, @@ -37,9 +38,13 @@ def __init__(self, start_date, credentials, **kwargs): common_params = dict(api=self._api, start_date=self._start_date) self._apis = { "campaigns": CampaignStream(**common_params), - "companies": CRMObjectIncrementalStream(entity="company", associations=["contacts"], **common_params), + "companies": CRMSearchStream( + entity="company", last_modified_field="hs_lastmodifieddate", associations=["contacts"], **common_params + ), "contact_lists": ContactListStream(**common_params), - "contacts": CRMObjectIncrementalStream(entity="contact", **common_params), + "contacts": CRMSearchStream( + entity="contact", last_modified_field="lastmodifieddate", associations=["contacts"], **common_params + ), "contacts_list_memberships": ContactsListMembershipsStream(**common_params), "deal_pipelines": DealPipelineStream(**common_params), "deals": DealStream(associations=["contacts"], **common_params), @@ -63,7 +68,7 @@ def __init__(self, start_date, credentials, **kwargs): super().__init__(**kwargs) def _enumerate_methods(self) -> Mapping[str, Callable]: - return {name: api.list for name, api in self._apis.items()} + return {name: api.list_records for name, api in self._apis.items()} @property def streams(self) -> Iterator[AirbyteStream]: diff --git a/docs/integrations/sources/hubspot.md b/docs/integrations/sources/hubspot.md index 79e463b79434..ffa18c68000c 100644 --- a/docs/integrations/sources/hubspot.md +++ b/docs/integrations/sources/hubspot.md @@ -110,8 +110,10 @@ If you are using Oauth, most of the streams require the appropriate [scopes](htt | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:-----------------------------------------------------------------------------------------------------------------------------------------------| +| 0.1.33 | 2021-01-14 | [8887](https://github.com/airbytehq/airbyte/pull/8887) | More efficient support for incremental updates on Companies, Contact, Deals and Engagement streams | | 0.1.32 | 2022-01-13 | [8011](https://github.com/airbytehq/airbyte/pull/8011) | Add new stream form_submissions | | 0.1.31 | 2022-01-11 | [9385](https://github.com/airbytehq/airbyte/pull/9385) | Remove auto-generated `properties` from `Engagements` stream | + | 0.1.30 | 2021-01-10 | [9129](https://github.com/airbytehq/airbyte/pull/9129) | Created Contacts list memberships streams | | 0.1.29 | 2021-12-17 | [8699](https://github.com/airbytehq/airbyte/pull/8699) | Add incremental sync support for `companies`, `contact_lists`, `contacts`, `deals`, `line_items`, `products`, `quotes`, `tickets` streams | | 0.1.28 | 2021-12-15 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Update fields and descriptions |