Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Hubspot: Fixed engagements pagination #11266

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ def read_records(
if not next_page_token:
pagination_complete = True
elif self.state and next_page_token["payload"]["after"] >= 10000:
# Hubspot documentations states that the search endpoints are limited to 10,000 total results
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest state that has been collected.
Expand Down Expand Up @@ -1065,7 +1065,6 @@ class Engagements(IncrementalStream):

url = "/engagements/v1/engagements/paged"
more_key = "hasMore"
limit = 250
updated_at_field = "lastUpdated"
created_at_field = "createdAt"
primary_key = "id"
Expand All @@ -1085,10 +1084,59 @@ def request_params(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = {self.limit_field: self.limit}
params = {"count": 250}
if next_page_token:
params["offset"] = next_page_token["offset"]
if self.state:
params["since"] = int(self._state.timestamp() * 1000)
params.update({"since": int(self._state.timestamp() * 1000), "count": 100})
return params

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
return [None]

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False

next_page_token = None
latest_cursor = None
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:
response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token)
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))

if self.filter_old_records:
records = self._filter_old_records(records)

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record

next_page_token = self.next_page_token(response)
if self.state and next_page_token and next_page_token["offset"] >= 10000:
# As per Hubspot documentation, the recent engagements endpoint will only return the 10K
# most recently updated engagements. Since they are returned sorted by `lastUpdated` in
# descending order, we stop getting records if we have already reached 10,000. Attempting
# to get more than 10K will result in a HTTP 400 error.
# https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
next_page_token = None

if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []

self._update_state(latest_cursor=latest_cursor)


class Forms(Stream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type
from source_hubspot.errors import HubspotRateLimited
from source_hubspot.source import SourceHubspot
from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Products, Stream, Workflows, split_properties
from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Engagements, Products, Stream, Workflows, split_properties

NUMBER_OF_PROPERTIES = 2000

Expand Down Expand Up @@ -399,3 +399,106 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
# Instead, it should use the new state to start a new search query.
assert len(records) == 11000
assert test_stream.state["updatedAt"] == "2022-03-01T00:00:00+00:00"


def test_engagements_stream_pagination_works(requests_mock, common_params):
"""
Tests the engagements stream handles pagination correctly, for both
full_refresh and incremental sync modes.
"""

# Mocking Request
requests_mock.register_uri("GET", "/engagements/v1/engagements/paged?hapikey=test_api_key&count=250", [
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250)],
"hasMore": True,
"offset": 250
},
"status_code": 200,
},
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250, 500)],
"hasMore": True,
"offset": 500
},
"status_code": 200,
},
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595251}} for y in range(500, 600)],
"hasMore": False
},
"status_code": 200,
}
])

requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", [
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)],
"hasMore": True,
"offset": 100
},
"status_code": 200,
},
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100, 200)],
"hasMore": True,
"offset": 200
},
"status_code": 200,
},
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(200, 250)],
"hasMore": False
},
"status_code": 200,
}
])

# Create test_stream instance for full refresh.
test_stream = Engagements(**common_params)

records = list(test_stream.read_records(sync_mode=SyncMode.full_refresh))
# The stream should handle pagination correctly and output 600 records.
assert len(records) == 600
assert test_stream.state["lastUpdated"] == 1641234595251

records = list(test_stream.read_records(sync_mode=SyncMode.incremental))
# The stream should handle pagination correctly and output 600 records.
assert len(records) == 250
assert test_stream.state["lastUpdated"] == 1641234595252


def test_incremental_engagements_stream_stops_at_10K_records(requests_mock, common_params, fake_properties_list):
"""
If there are more than 10,000 engagements that would be returned by the Hubspot recent engagements endpoint,
the Engagements instance should stop at the 10Kth record.
"""

responses = [
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)],
"hasMore": True,
"offset": x*100
},
"status_code": 200,
}
for x in range(1, 102)
]

# Create test_stream instance with some state
test_stream = Engagements(**common_params)
test_stream.state = {"lastUpdated": 1641234595251}

# Mocking Request
requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", responses)
records = list(test_stream.read_records(sync_mode=SyncMode.incremental))
# The stream should not attempt to get more than 10K records.
assert len(records) == 10000
assert test_stream.state["lastUpdated"] == +1641234595252