From 710543a9abacc7578238cb5edaa47f43ed7c0431 Mon Sep 17 00:00:00 2001 From: Luis Gomez <781929+lgomezm@users.noreply.github.com> Date: Mon, 14 Mar 2022 20:17:11 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Hubspot:=20Handled=2010?= =?UTF-8?q?K+=20search-endpoint=20queries=20(#10700)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Handled search queries that would output more than 10K records * Getting CRM search objects in ascending chronological ortder * Fixed stream * Fixed rebase * Fixed condition * Added unit test * Removed unused import * Started a new query when reached 10K records * Moved comment --- .../source-hubspot/source_hubspot/streams.py | 8 +++ .../source-hubspot/unit_tests/test_source.py | 59 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index 4a0e1ebf1f30..66f3afa6d6b7 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -763,6 +763,7 @@ def _process_search( payload = ( { "filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}], + "sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}], "properties": properties_list, "limit": 100, } @@ -822,6 +823,13 @@ def read_records( next_page_token = self.next_page_token(raw_response) if not next_page_token: pagination_complete = True + elif self.state and next_page_token["payload"]["after"] >= 10000: + # Hubspot documentations states that the search endpoints are limited to 10,000 total results + # for any given query. Attempting to page beyond 10,000 will result in a 400 error. + # https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and + # start a new search query with the latest state that has been collected. + self._update_state(latest_cursor=latest_cursor) + next_page_token = None self._update_state(latest_cursor=latest_cursor) # Always return an empty generator just in case no records were ever yielded diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py index 0e1243ad9942..f61e840fb620 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py @@ -296,3 +296,62 @@ def test_it_should_not_read_quotes_stream_if_it_does_not_exist_in_client(oauth_c all_records = list(source.read(logger, config=oauth_config, catalog=configured_catalog, state=None)) records = [record for record in all_records if record.type == Type.RECORD] assert not records + + +def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(requests_mock, common_params, fake_properties_list): + """ + If there are more than 10,000 records that would be returned by the Hubspot search endpoint, + the CRMSearchStream instance should stop at the 10Kth record + """ + + responses = [ + { + "json": { + "results": [{"id": f"{y}", "updatedAt": "2022-02-25T16:43:11Z"} for y in range(100)], + "paging": {"next": {"after": f"{x*100}",}} + }, + "status_code": 200, + } + for x in range(1, 101) + ] + # After reaching 10K records, it performs a new search query. + responses.extend([ + { + "json": { + "results": [{"id": f"{y}", "updatedAt": "2022-03-01T00:00:00Z"} for y in range(100)], + "paging": {"next": {"after": f"{x*100}",}} + }, + "status_code": 200, + } + for x in range(1, 10) + ]) + # Last page... it does not have paging->next->after + responses.append({ + "json": { + "results": [{"id": f"{y}", "updatedAt": "2022-03-01T00:00:00Z"} for y in range(100)], + "paging": {} + }, + "status_code": 200, + }) + + properties_response = [{ + "json": [ + {"name": property_name, "type": "string", "updatedAt": 1571085954360, "createdAt": 1565059306048} + for property_name in fake_properties_list + ], + "status_code": 200 + }] + + # Create test_stream instance with some state + test_stream = Companies(**common_params) + test_stream.state = {"updatedAt": "2022-02-24T16:43:11Z"} + + # Mocking Request + requests_mock.register_uri("POST", test_stream.url, responses) + requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response) + records = list(test_stream.read_records(sync_mode=SyncMode.incremental)) + # The stream should not attempt to get more than 10K records. + # Instead, it should use the new state to start a new search query. + assert len(records) == 11000 + assert test_stream.state['updatedAt'] == '2022-03-01T00:00:00+00:00' +