Skip to content

Commit

Permalink
🐛 Source Hubspot: Fixed engagements pagination (#11266)
Browse files Browse the repository at this point in the history
* Fixed engagement stream pagination

* Added unit test

* Removed unused import

* Fixed issue if incremental engagements attempts to get more than 10K records

* Fixed comment

* Merged import statement in unit test
  • Loading branch information
lgomezm authored Mar 23, 2022
1 parent 893e5de commit 6cf1f09
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ def read_records(
if not next_page_token:
pagination_complete = True
elif self.state and next_page_token["payload"]["after"] >= 10000:
# Hubspot documentations states that the search endpoints are limited to 10,000 total results
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest state that has been collected.
Expand Down Expand Up @@ -1065,7 +1065,6 @@ class Engagements(IncrementalStream):

url = "/engagements/v1/engagements/paged"
more_key = "hasMore"
limit = 250
updated_at_field = "lastUpdated"
created_at_field = "createdAt"
primary_key = "id"
Expand All @@ -1085,10 +1084,59 @@ def request_params(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = {self.limit_field: self.limit}
params = {"count": 250}
if next_page_token:
params["offset"] = next_page_token["offset"]
if self.state:
params["since"] = int(self._state.timestamp() * 1000)
params.update({"since": int(self._state.timestamp() * 1000), "count": 100})
return params

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
return [None]

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False

next_page_token = None
latest_cursor = None
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:
response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token)
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))

if self.filter_old_records:
records = self._filter_old_records(records)

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record

next_page_token = self.next_page_token(response)
if self.state and next_page_token and next_page_token["offset"] >= 10000:
# As per Hubspot documentation, the recent engagements endpoint will only return the 10K
# most recently updated engagements. Since they are returned sorted by `lastUpdated` in
# descending order, we stop getting records if we have already reached 10,000. Attempting
# to get more than 10K will result in a HTTP 400 error.
# https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
next_page_token = None

if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []

self._update_state(latest_cursor=latest_cursor)


class Forms(Stream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type
from source_hubspot.errors import HubspotRateLimited
from source_hubspot.source import SourceHubspot
from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Products, Stream, Workflows, split_properties
from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Engagements, Products, Stream, Workflows, split_properties

NUMBER_OF_PROPERTIES = 2000

Expand Down Expand Up @@ -399,3 +399,106 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
# Instead, it should use the new state to start a new search query.
assert len(records) == 11000
assert test_stream.state["updatedAt"] == "2022-03-01T00:00:00+00:00"


def test_engagements_stream_pagination_works(requests_mock, common_params):
"""
Tests the engagements stream handles pagination correctly, for both
full_refresh and incremental sync modes.
"""

# Mocking Request
requests_mock.register_uri("GET", "/engagements/v1/engagements/paged?hapikey=test_api_key&count=250", [
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250)],
"hasMore": True,
"offset": 250
},
"status_code": 200,
},
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250, 500)],
"hasMore": True,
"offset": 500
},
"status_code": 200,
},
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595251}} for y in range(500, 600)],
"hasMore": False
},
"status_code": 200,
}
])

requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", [
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)],
"hasMore": True,
"offset": 100
},
"status_code": 200,
},
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100, 200)],
"hasMore": True,
"offset": 200
},
"status_code": 200,
},
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(200, 250)],
"hasMore": False
},
"status_code": 200,
}
])

# Create test_stream instance for full refresh.
test_stream = Engagements(**common_params)

records = list(test_stream.read_records(sync_mode=SyncMode.full_refresh))
# The stream should handle pagination correctly and output 600 records.
assert len(records) == 600
assert test_stream.state["lastUpdated"] == 1641234595251

records = list(test_stream.read_records(sync_mode=SyncMode.incremental))
# The stream should handle pagination correctly and output 600 records.
assert len(records) == 250
assert test_stream.state["lastUpdated"] == 1641234595252


def test_incremental_engagements_stream_stops_at_10K_records(requests_mock, common_params, fake_properties_list):
"""
If there are more than 10,000 engagements that would be returned by the Hubspot recent engagements endpoint,
the Engagements instance should stop at the 10Kth record.
"""

responses = [
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)],
"hasMore": True,
"offset": x*100
},
"status_code": 200,
}
for x in range(1, 102)
]

# Create test_stream instance with some state
test_stream = Engagements(**common_params)
test_stream.state = {"lastUpdated": 1641234595251}

# Mocking Request
requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", responses)
records = list(test_stream.read_records(sync_mode=SyncMode.incremental))
# The stream should not attempt to get more than 10K records.
assert len(records) == 10000
assert test_stream.state["lastUpdated"] == +1641234595252

0 comments on commit 6cf1f09

Please sign in to comment.