From 21b43f56ccb9412fe09fdfc3abb84557549f3407 Mon Sep 17 00:00:00 2001 From: Baz Date: Fri, 24 Jun 2022 08:38:44 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Hubspot:=20remove=20`Ai?= =?UTF-8?q?rbyteSentry`=20dependency=20(#14102)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fixed * updated changelog * auto-bump connector version Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-hubspot/Dockerfile | 2 +- .../source-hubspot/source_hubspot/streams.py | 175 +++++++++--------- docs/integrations/sources/hubspot.md | 1 + 5 files changed, 90 insertions(+), 92 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 5fabcbd88f63..918b0374f61c 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -398,7 +398,7 @@ - name: HubSpot sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c dockerRepository: airbyte/source-hubspot - dockerImageTag: 0.1.70 + dockerImageTag: 0.1.71 documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot icon: hubspot.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index fdc71841d992..a7e7c327cbf7 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3658,7 +3658,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-hubspot:0.1.70" +- dockerImage: "airbyte/source-hubspot:0.1.71" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-hubspot/Dockerfile b/airbyte-integrations/connectors/source-hubspot/Dockerfile index 45e2e9210f0b..4ced422087ff 100644 --- a/airbyte-integrations/connectors/source-hubspot/Dockerfile +++ b/airbyte-integrations/connectors/source-hubspot/Dockerfile @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.70 +LABEL io.airbyte.version=0.1.71 LABEL io.airbyte.name=airbyte/source-hubspot diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index 4f125ba49f7c..14a32789eb96 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -17,7 +17,6 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator -from airbyte_cdk.sources.utils.sentry import AirbyteSentry from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from requests import codes from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout @@ -337,35 +336,34 @@ def read_records( next_page_token = None try: - with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"): - while not pagination_complete: - - properties = self._property_wrapper - if properties and properties.too_many_properties: - records, response = self._read_stream_records( - stream_slice=stream_slice, - stream_state=stream_state, - next_page_token=next_page_token, - ) - else: - response = self.handle_request( - stream_slice=stream_slice, - stream_state=stream_state, - next_page_token=next_page_token, - properties=properties, - ) - records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)) - - if self.filter_old_records: - records = self._filter_old_records(records) - yield from records - - next_page_token = self.next_page_token(response) - if not next_page_token: - pagination_complete = True - - # Always return an empty generator just in case no records were ever yielded - yield from [] + while not pagination_complete: + + properties = self._property_wrapper + if properties and properties.too_many_properties: + records, response = self._read_stream_records( + stream_slice=stream_slice, + stream_state=stream_state, + next_page_token=next_page_token, + ) + else: + response = self.handle_request( + stream_slice=stream_slice, + stream_state=stream_state, + next_page_token=next_page_token, + properties=properties, + ) + records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)) + + if self.filter_old_records: + records = self._filter_old_records(records) + yield from records + + next_page_token = self.next_page_token(response) + if not next_page_token: + pagination_complete = True + + # Always return an empty generator just in case no records were ever yielded + yield from [] except requests.exceptions.HTTPError as e: raise e @@ -810,43 +808,42 @@ def read_records( next_page_token = None latest_cursor = None - with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"): - while not pagination_complete: - if self.state: - records, raw_response = self._process_search( - next_page_token=next_page_token, - stream_state=stream_state, - stream_slice=stream_slice, - ) + while not pagination_complete: + if self.state: + records, raw_response = self._process_search( + next_page_token=next_page_token, + stream_state=stream_state, + stream_slice=stream_slice, + ) - else: - records, raw_response = self._read_stream_records( - stream_slice=stream_slice, - stream_state=stream_state, - next_page_token=next_page_token, - ) - records = self._filter_old_records(records) - records = self._flat_associations(records) - - for record in records: - cursor = self._field_to_datetime(record[self.updated_at_field]) - latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor - yield record + else: + records, raw_response = self._read_stream_records( + stream_slice=stream_slice, + stream_state=stream_state, + next_page_token=next_page_token, + ) + records = self._filter_old_records(records) + records = self._flat_associations(records) + + for record in records: + cursor = self._field_to_datetime(record[self.updated_at_field]) + latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor + yield record + + next_page_token = self.next_page_token(raw_response) + if not next_page_token: + pagination_complete = True + elif self.state and next_page_token["payload"]["after"] >= 10000: + # Hubspot documentation states that the search endpoints are limited to 10,000 total results + # for any given query. Attempting to page beyond 10,000 will result in a 400 error. + # https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and + # start a new search query with the latest state that has been collected. + self._update_state(latest_cursor=latest_cursor) + next_page_token = None - next_page_token = self.next_page_token(raw_response) - if not next_page_token: - pagination_complete = True - elif self.state and next_page_token["payload"]["after"] >= 10000: - # Hubspot documentation states that the search endpoints are limited to 10,000 total results - # for any given query. Attempting to page beyond 10,000 will result in a 400 error. - # https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and - # start a new search query with the latest state that has been collected. - self._update_state(latest_cursor=latest_cursor) - next_page_token = None - - self._update_state(latest_cursor=latest_cursor) - # Always return an empty generator just in case no records were ever yielded - yield from [] + self._update_state(latest_cursor=latest_cursor) + # Always return an empty generator just in case no records were ever yielded + yield from [] def request_params( self, @@ -1151,33 +1148,33 @@ def read_records( next_page_token = None latest_cursor = None - with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"): - while not pagination_complete: - response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token) - records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)) - - if self.filter_old_records: - records = self._filter_old_records(records) - - for record in records: - cursor = self._field_to_datetime(record[self.updated_at_field]) - latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor - yield record - next_page_token = self.next_page_token(response) - if self.state and next_page_token and next_page_token["offset"] >= 10000: - # As per Hubspot documentation, the recent engagements endpoint will only return the 10K - # most recently updated engagements. Since they are returned sorted by `lastUpdated` in - # descending order, we stop getting records if we have already reached 10,000. Attempting - # to get more than 10K will result in a HTTP 400 error. - # https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements - next_page_token = None + while not pagination_complete: + response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token) + records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)) - if not next_page_token: - pagination_complete = True + if self.filter_old_records: + records = self._filter_old_records(records) - # Always return an empty generator just in case no records were ever yielded - yield from [] + for record in records: + cursor = self._field_to_datetime(record[self.updated_at_field]) + latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor + yield record + + next_page_token = self.next_page_token(response) + if self.state and next_page_token and next_page_token["offset"] >= 10000: + # As per Hubspot documentation, the recent engagements endpoint will only return the 10K + # most recently updated engagements. Since they are returned sorted by `lastUpdated` in + # descending order, we stop getting records if we have already reached 10,000. Attempting + # to get more than 10K will result in a HTTP 400 error. + # https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements + next_page_token = None + + if not next_page_token: + pagination_complete = True + + # Always return an empty generator just in case no records were ever yielded + yield from [] self._update_state(latest_cursor=latest_cursor) diff --git a/docs/integrations/sources/hubspot.md b/docs/integrations/sources/hubspot.md index 2e59a9522497..d544bcb80315 100644 --- a/docs/integrations/sources/hubspot.md +++ b/docs/integrations/sources/hubspot.md @@ -129,6 +129,7 @@ Now that you have set up the Mailchimp source connector, check out the following | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------| +| 0.1.71 | 2022-06-24 | [14102](https://github.com/airbytehq/airbyte/pull/14102) | Removed legacy `AirbyteSentry` dependency from the code | 0.1.70 | 2022-06-16 | [13837](https://github.com/airbytehq/airbyte/pull/13837) | Fix the missing data in CRM streams issue | | 0.1.69 | 2022-06-10 | [13691](https://github.com/airbytehq/airbyte/pull/13691) | Fix the `URI Too Long` issue | | 0.1.68 | 2022-06-08 | [13596](https://github.com/airbytehq/airbyte/pull/13596) | Fix for the `property_history` which did not emit records |