From 2517ecd684f013bd28ec49a6a2e5c16912a41e04 Mon Sep 17 00:00:00 2001 From: lgomezm Date: Fri, 18 Mar 2022 09:22:46 -0500 Subject: [PATCH 1/6] Fixed engagement stream pagination --- .../source-hubspot/source_hubspot/streams.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index 4b6df9268d28..2de345cd5c51 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -13,6 +13,7 @@ import backoff import pendulum as pendulum +from pytest import param import requests from airbyte_cdk.entrypoint import logger from airbyte_cdk.models import SyncMode @@ -267,6 +268,7 @@ def handle_request( if params: request_params.update(params) + logger.info(f'next_page_request: {next_page_token} - params: {request_params}') request = self._create_prepared_request( path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), headers=dict(request_headers, **self.authenticator.get_auth_header()), @@ -1065,7 +1067,6 @@ class Engagements(IncrementalStream): url = "/engagements/v1/engagements/paged" more_key = "hasMore" - limit = 250 updated_at_field = "lastUpdated" created_at_field = "createdAt" primary_key = "id" @@ -1085,11 +1086,17 @@ def request_params( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: - params = {self.limit_field: self.limit} + params = {"count": 250} + if next_page_token: + params["offset"] = next_page_token["offset"] if self.state: - params["since"] = int(self._state.timestamp() * 1000) + params.update({"since": int(self._state.timestamp() * 1000), "count": 100}) return params - + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + return [None] class Forms(Stream): """Marketing Forms, API v3 From d0e3a3bb7715bf5abce3bc292edc2c27d6adf93b Mon Sep 17 00:00:00 2001 From: lgomezm Date: Fri, 18 Mar 2022 14:25:19 -0500 Subject: [PATCH 2/6] Added unit test --- .../source-hubspot/source_hubspot/streams.py | 1 - .../source-hubspot/unit_tests/test_source.py | 74 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index 2de345cd5c51..b2883032b413 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -268,7 +268,6 @@ def handle_request( if params: request_params.update(params) - logger.info(f'next_page_request: {next_page_token} - params: {request_params}') request = self._create_prepared_request( path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), headers=dict(request_headers, **self.authenticator.get_auth_header()), diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py index b79f0fbdd45a..e893497b41da 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py @@ -13,6 +13,7 @@ from source_hubspot.errors import HubspotRateLimited from source_hubspot.source import SourceHubspot from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Products, Stream, Workflows, split_properties +from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Engagements, Products, Workflows, split_properties NUMBER_OF_PROPERTIES = 2000 @@ -399,3 +400,76 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req # Instead, it should use the new state to start a new search query. assert len(records) == 11000 assert test_stream.state["updatedAt"] == "2022-03-01T00:00:00+00:00" + + +def test_engagements_stream_pagination_works(requests_mock, common_params): + """ + If there are more than 10,000 records that would be returned by the Hubspot search endpoint, + the CRMSearchStream instance should stop at the 10Kth record + """ + + # Mocking Request + requests_mock.register_uri("GET", "/engagements/v1/engagements/paged?hapikey=test_api_key&count=250", [ + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250)], + "hasMore": True, + "offset": 250 + }, + "status_code": 200, + }, + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234593251}} for y in range(250, 500)], + "hasMore": True, + "offset": 500 + }, + "status_code": 200, + }, + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595251}} for y in range(500, 600)], + "hasMore": False + }, + "status_code": 200, + } + ]) + + requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", [ + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)], + "hasMore": True, + "offset": 100 + }, + "status_code": 200, + }, + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100, 200)], + "hasMore": True, + "offset": 200 + }, + "status_code": 200, + }, + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(200, 250)], + "hasMore": False + }, + "status_code": 200, + } + ]) + + # Create test_stream instance for full refresh. + test_stream = Engagements(**common_params) + + records = list(test_stream.read_records(sync_mode=SyncMode.full_refresh)) + # The stream should handle pagination correctly and output 600 records. + assert len(records) == 600 + assert test_stream.state["lastUpdated"] == 1641234595251 + + records = list(test_stream.read_records(sync_mode=SyncMode.incremental)) + # The stream should handle pagination correctly and output 600 records. + assert len(records) == 250 + assert test_stream.state["lastUpdated"] == 1641234595252 From 770ac56049c40554216778826fbb0561db286258 Mon Sep 17 00:00:00 2001 From: lgomezm Date: Fri, 18 Mar 2022 15:00:16 -0500 Subject: [PATCH 3/6] Removed unused import --- .../connectors/source-hubspot/source_hubspot/streams.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index b2883032b413..d63553216998 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -13,7 +13,6 @@ import backoff import pendulum as pendulum -from pytest import param import requests from airbyte_cdk.entrypoint import logger from airbyte_cdk.models import SyncMode From 47929d4cf99c380e6886a1490e3710023afef92e Mon Sep 17 00:00:00 2001 From: lgomezm Date: Fri, 18 Mar 2022 16:41:38 -0500 Subject: [PATCH 4/6] Fixed issue if incremental engagements attempts to get more than 10K records --- .../source-hubspot/source_hubspot/streams.py | 45 ++++++++++++++++++- .../source-hubspot/unit_tests/test_source.py | 35 ++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index d63553216998..db6c88cc8a52 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -824,7 +824,7 @@ def read_records( if not next_page_token: pagination_complete = True elif self.state and next_page_token["payload"]["after"] >= 10000: - # Hubspot documentations states that the search endpoints are limited to 10,000 total results + # Hubspot documentation states that the search endpoints are limited to 10,000 total results # for any given query. Attempting to page beyond 10,000 will result in a 400 error. # https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and # start a new search query with the latest state that has been collected. @@ -1095,6 +1095,49 @@ def stream_slices( self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: return [None] + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + stream_state = stream_state or {} + pagination_complete = False + + next_page_token = None + latest_cursor = None + with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"): + while not pagination_complete: + response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token) + records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)) + + if self.filter_old_records: + records = self._filter_old_records(records) + + for record in records: + cursor = self._field_to_datetime(record[self.updated_at_field]) + latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor + yield record + + next_page_token = self.next_page_token(response) + if self.state and next_page_token and next_page_token["offset"] >= 10000: + # As per Hubspot documentation, the recent engagements endpoint will only return the 10K + # most recently updated engagements. Since they are returned sorted by `lastUpdated` in + # descending order, we stop getting records if we have already reached 10,000. Attempting + # to get more than 10K will result in a HTTP 400 error. + # https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements + next_page_token = None + + if not next_page_token: + pagination_complete = True + + # Always return an empty generator just in case no records were ever yielded + yield from [] + + self._update_state(latest_cursor=latest_cursor) + class Forms(Stream): """Marketing Forms, API v3 diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py index e893497b41da..839b4e857ed2 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py @@ -404,8 +404,8 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req def test_engagements_stream_pagination_works(requests_mock, common_params): """ - If there are more than 10,000 records that would be returned by the Hubspot search endpoint, - the CRMSearchStream instance should stop at the 10Kth record + Tests the engagements stream handles pagination correctly, for both + full_refresh and incremental sync modes. """ # Mocking Request @@ -473,3 +473,34 @@ def test_engagements_stream_pagination_works(requests_mock, common_params): # The stream should handle pagination correctly and output 600 records. assert len(records) == 250 assert test_stream.state["lastUpdated"] == 1641234595252 + + +def test_incremental_engagements_stream_stops_at_10K_records(requests_mock, common_params, fake_properties_list): + """ + If there are more than 10,000 engagements that would be returned by the Hubspot recent engagements endpoint, + the Engagements instance should stop at the 10Kth record. + """ + + responses = [ + { + "json": { + "results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)], + "hasMore": True, + "offset": x*100 + }, + "status_code": 200, + } + for x in range(1, 102) + ] + + # Create test_stream instance with some state + test_stream = Engagements(**common_params) + test_stream.state = {"lastUpdated": 1641234595251} + + # Mocking Request + requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", responses) + records = list(test_stream.read_records(sync_mode=SyncMode.incremental)) + # The stream should not attempt to get more than 10K records. + # Instead, it should use the new state to start a new search query. + assert len(records) == 10000 + assert test_stream.state["lastUpdated"] == +1641234595252 From 930a185628f4a15c6298651f30400086d53c02f6 Mon Sep 17 00:00:00 2001 From: lgomezm Date: Fri, 18 Mar 2022 17:15:37 -0500 Subject: [PATCH 5/6] Fixed comment --- .../connectors/source-hubspot/unit_tests/test_source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py index 839b4e857ed2..7d42675f085f 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py @@ -501,6 +501,5 @@ def test_incremental_engagements_stream_stops_at_10K_records(requests_mock, comm requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", responses) records = list(test_stream.read_records(sync_mode=SyncMode.incremental)) # The stream should not attempt to get more than 10K records. - # Instead, it should use the new state to start a new search query. assert len(records) == 10000 assert test_stream.state["lastUpdated"] == +1641234595252 From 50da5321894f45abfee99c9d93aa7d9e44440368 Mon Sep 17 00:00:00 2001 From: lgomezm Date: Tue, 22 Mar 2022 14:41:27 -0500 Subject: [PATCH 6/6] Merged import statement in unit test --- .../connectors/source-hubspot/unit_tests/test_source.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py index 7d42675f085f..adf3947c47ef 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py @@ -12,8 +12,7 @@ from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type from source_hubspot.errors import HubspotRateLimited from source_hubspot.source import SourceHubspot -from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Products, Stream, Workflows, split_properties -from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Engagements, Products, Workflows, split_properties +from source_hubspot.streams import API, PROPERTIES_PARAM_MAX_LENGTH, Companies, Deals, Engagements, Products, Stream, Workflows, split_properties NUMBER_OF_PROPERTIES = 2000