From 6cf1f09771ee8d0b0278f58a1db9f36a4cc7e9bd Mon Sep 17 00:00:00 2001 From: Luis Gomez <781929+lgomezm@users.noreply.github.com> Date: Tue, 22 Mar 2022 20:12:53 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20=20Source=20Hubspot:=20Fixed=20e?= =?UTF-8?q?ngagements=20pagination=20(#11266)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixed engagement stream pagination * Added unit test * Removed unused import * Fixed issue if incremental engagements attempts to get more than 10K records * Fixed comment * Merged import statement in unit test --- .../source-hubspot/source_hubspot/streams.py | 56 +++++++++- .../source-hubspot/unit_tests/test_source.py | 105 +++++++++++++++++- 2 files changed, 156 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index 4b6df9268d28..db6c88cc8a52 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -824,7 +824,7 @@ def read_records( if not next_page_token: pagination_complete = True elif self.state and next_page_token["payload"]["after"] >= 10000: - # Hubspot documentations states that the search endpoints are limited to 10,000 total results + # Hubspot documentation states that the search endpoints are limited to 10,000 total results # for any given query. Attempting to page beyond 10,000 will result in a 400 error. # https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and # start a new search query with the latest state that has been collected. @@ -1065,7 +1065,6 @@ class Engagements(IncrementalStream): url = "/engagements/v1/engagements/paged" more_key = "hasMore" - limit = 250 updated_at_field = "lastUpdated" created_at_field = "createdAt" primary_key = "id" @@ -1085,10 +1084,59 @@ def request_params( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: - params = {self.limit_field: self.limit} + params = {"count": 250} + if next_page_token: + params["offset"] = next_page_token["offset"] if self.state: - params["since"] = int(self._state.timestamp() * 1000) + params.update({"since": int(self._state.timestamp() * 1000), "count": 100}) return params + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + return [None] + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + stream_state = stream_state or {} + pagination_complete = False + + next_page_token = None + latest_cursor = None + with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"): + while not pagination_complete: + response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token) + records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)) + + if self.filter_old_records: + records = self._filter_old_records(records) + + for record in records: + cursor = self._field_to_datetime(record[self.updated_at_field]) + latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor + yield record + + next_page_token = self.next_page_token(response) + if self.state and next_page_token and next_page_token["offset"] >= 10000: + # As per Hubspot documentation, the recent engagements endpoint will only return the 10K + # most recently updated engagements. Since they are returned sorted by `lastUpdated` in + # descending order, we stop getting records if we have already reached 10,000. Attempting + # to get more than 10K will result in a HTTP 400 error. + # https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements + next_page_token = None + + if not next_page_token: + pagination_complete = True + + # Always return an empty generator just in case no records were ever yielded + yield from [] + + self._update_state(latest_cursor=latest_cursor) class Forms(Stream): diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py index b79f0fbdd45a..adf3947c47ef 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py @@ -12,7 +12,7 @@ from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type from source_hubspot.errors import HubspotRateLimited from source_hubspot.source import SourceHubspot -from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Products, Stream, Workflows, split_properties +from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Engagements, Products, Stream, Workflows, split_properties NUMBER_OF_PROPERTIES = 2000 @@ -399,3 +399,106 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req # Instead, it should use the new state to start a new search query. assert len(records) == 11000 assert test_stream.state["updatedAt"] == "2022-03-01T00:00:00+00:00" + + +def test_engagements_stream_pagination_works(requests_mock, common_params): + """ + Tests the engagements stream handles pagination correctly, for both + full_refresh and incremental sync modes. + """ + + # Mocking Request + requests_mock.register_uri("GET", "/engagements/v1/engagements/paged?hapikey=test_api_key&count=250", [ + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250)], + "hasMore": True, + "offset": 250 + }, + "status_code": 200, + }, + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250, 500)], + "hasMore": True, + "offset": 500 + }, + "status_code": 200, + }, + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595251}} for y in range(500, 600)], + "hasMore": False + }, + "status_code": 200, + } + ]) + + requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", [ + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)], + "hasMore": True, + "offset": 100 + }, + "status_code": 200, + }, + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100, 200)], + "hasMore": True, + "offset": 200 + }, + "status_code": 200, + }, + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(200, 250)], + "hasMore": False + }, + "status_code": 200, + } + ]) + + # Create test_stream instance for full refresh. + test_stream = Engagements(**common_params) + + records = list(test_stream.read_records(sync_mode=SyncMode.full_refresh)) + # The stream should handle pagination correctly and output 600 records. + assert len(records) == 600 + assert test_stream.state["lastUpdated"] == 1641234595251 + + records = list(test_stream.read_records(sync_mode=SyncMode.incremental)) + # The stream should handle pagination correctly and output 600 records. + assert len(records) == 250 + assert test_stream.state["lastUpdated"] == 1641234595252 + + +def test_incremental_engagements_stream_stops_at_10K_records(requests_mock, common_params, fake_properties_list): + """ + If there are more than 10,000 engagements that would be returned by the Hubspot recent engagements endpoint, + the Engagements instance should stop at the 10Kth record. + """ + + responses = [ + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)], + "hasMore": True, + "offset": x*100 + }, + "status_code": 200, + } + for x in range(1, 102) + ] + + # Create test_stream instance with some state + test_stream = Engagements(**common_params) + test_stream.state = {"lastUpdated": 1641234595251} + + # Mocking Request + requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", responses) + records = list(test_stream.read_records(sync_mode=SyncMode.incremental)) + # The stream should not attempt to get more than 10K records. + assert len(records) == 10000 + assert test_stream.state["lastUpdated"] == +1641234595252