Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Hubspot: remove AirbyteSentry dependency #14102

Merged
merged 3 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@
- name: HubSpot
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.1.70
dockerImageTag: 0.1.71
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3658,7 +3658,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-hubspot:0.1.70"
- dockerImage: "airbyte/source-hubspot:0.1.71"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.70
LABEL io.airbyte.version=0.1.71
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
from airbyte_cdk.sources.utils.sentry import AirbyteSentry
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from requests import codes
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout
Expand Down Expand Up @@ -337,35 +336,34 @@ def read_records(

next_page_token = None
try:
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:

properties = self._property_wrapper
if properties and properties.too_many_properties:
records, response = self._read_stream_records(
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
)
else:
response = self.handle_request(
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
properties=properties,
)
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))

if self.filter_old_records:
records = self._filter_old_records(records)
yield from records

next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []
while not pagination_complete:

properties = self._property_wrapper
if properties and properties.too_many_properties:
records, response = self._read_stream_records(
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
)
else:
response = self.handle_request(
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
properties=properties,
)
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))

if self.filter_old_records:
records = self._filter_old_records(records)
yield from records

next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []
except requests.exceptions.HTTPError as e:
raise e

Expand Down Expand Up @@ -810,43 +808,42 @@ def read_records(
next_page_token = None

latest_cursor = None
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:
if self.state:
records, raw_response = self._process_search(
next_page_token=next_page_token,
stream_state=stream_state,
stream_slice=stream_slice,
)
while not pagination_complete:
if self.state:
records, raw_response = self._process_search(
next_page_token=next_page_token,
stream_state=stream_state,
stream_slice=stream_slice,
)

else:
records, raw_response = self._read_stream_records(
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
)
records = self._filter_old_records(records)
records = self._flat_associations(records)

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record
else:
records, raw_response = self._read_stream_records(
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
)
records = self._filter_old_records(records)
records = self._flat_associations(records)

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record

next_page_token = self.next_page_token(raw_response)
if not next_page_token:
pagination_complete = True
elif self.state and next_page_token["payload"]["after"] >= 10000:
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest state that has been collected.
self._update_state(latest_cursor=latest_cursor)
next_page_token = None

next_page_token = self.next_page_token(raw_response)
if not next_page_token:
pagination_complete = True
elif self.state and next_page_token["payload"]["after"] >= 10000:
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest state that has been collected.
self._update_state(latest_cursor=latest_cursor)
next_page_token = None

self._update_state(latest_cursor=latest_cursor)
# Always return an empty generator just in case no records were ever yielded
yield from []
self._update_state(latest_cursor=latest_cursor)
# Always return an empty generator just in case no records were ever yielded
yield from []

def request_params(
self,
Expand Down Expand Up @@ -1151,33 +1148,33 @@ def read_records(

next_page_token = None
latest_cursor = None
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:
response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token)
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))

if self.filter_old_records:
records = self._filter_old_records(records)

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record

next_page_token = self.next_page_token(response)
if self.state and next_page_token and next_page_token["offset"] >= 10000:
# As per Hubspot documentation, the recent engagements endpoint will only return the 10K
# most recently updated engagements. Since they are returned sorted by `lastUpdated` in
# descending order, we stop getting records if we have already reached 10,000. Attempting
# to get more than 10K will result in a HTTP 400 error.
# https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
next_page_token = None
while not pagination_complete:
response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token)
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))

if not next_page_token:
pagination_complete = True
if self.filter_old_records:
records = self._filter_old_records(records)

# Always return an empty generator just in case no records were ever yielded
yield from []
for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record

next_page_token = self.next_page_token(response)
if self.state and next_page_token and next_page_token["offset"] >= 10000:
# As per Hubspot documentation, the recent engagements endpoint will only return the 10K
# most recently updated engagements. Since they are returned sorted by `lastUpdated` in
# descending order, we stop getting records if we have already reached 10,000. Attempting
# to get more than 10K will result in a HTTP 400 error.
# https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
next_page_token = None

if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []

self._update_state(latest_cursor=latest_cursor)

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Now that you have set up the Mailchimp source connector, check out the following

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------|
| 0.1.71 | 2022-06-24 | [14102](https://github.com/airbytehq/airbyte/pull/14102) | Removed legacy `AirbyteSentry` dependency from the code
| 0.1.70 | 2022-06-16 | [13837](https://github.com/airbytehq/airbyte/pull/13837) | Fix the missing data in CRM streams issue |
| 0.1.69 | 2022-06-10 | [13691](https://github.com/airbytehq/airbyte/pull/13691) | Fix the `URI Too Long` issue |
| 0.1.68 | 2022-06-08 | [13596](https://github.com/airbytehq/airbyte/pull/13596) | Fix for the `property_history` which did not emit records |
Expand Down