Skip to content

Commit

Permalink
🐛 Source Hubspot: Handled 10K+ search-endpoint queries (#10700)
Browse files Browse the repository at this point in the history
* Handled search queries that would output more than 10K records

* Getting CRM search objects in ascending chronological ortder

* Fixed stream

* Fixed rebase

* Fixed condition

* Added unit test

* Removed unused import

* Started a new query when reached 10K records

* Moved comment
  • Loading branch information
lgomezm authored Mar 15, 2022
1 parent e710e43 commit 710543a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ def _process_search(
payload = (
{
"filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}],
"sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}],
"properties": properties_list,
"limit": 100,
}
Expand Down Expand Up @@ -822,6 +823,13 @@ def read_records(
next_page_token = self.next_page_token(raw_response)
if not next_page_token:
pagination_complete = True
elif self.state and next_page_token["payload"]["after"] >= 10000:
# Hubspot documentations states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest state that has been collected.
self._update_state(latest_cursor=latest_cursor)
next_page_token = None

self._update_state(latest_cursor=latest_cursor)
# Always return an empty generator just in case no records were ever yielded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,62 @@ def test_it_should_not_read_quotes_stream_if_it_does_not_exist_in_client(oauth_c
all_records = list(source.read(logger, config=oauth_config, catalog=configured_catalog, state=None))
records = [record for record in all_records if record.type == Type.RECORD]
assert not records


def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(requests_mock, common_params, fake_properties_list):
"""
If there are more than 10,000 records that would be returned by the Hubspot search endpoint,
the CRMSearchStream instance should stop at the 10Kth record
"""

responses = [
{
"json": {
"results": [{"id": f"{y}", "updatedAt": "2022-02-25T16:43:11Z"} for y in range(100)],
"paging": {"next": {"after": f"{x*100}",}}
},
"status_code": 200,
}
for x in range(1, 101)
]
# After reaching 10K records, it performs a new search query.
responses.extend([
{
"json": {
"results": [{"id": f"{y}", "updatedAt": "2022-03-01T00:00:00Z"} for y in range(100)],
"paging": {"next": {"after": f"{x*100}",}}
},
"status_code": 200,
}
for x in range(1, 10)
])
# Last page... it does not have paging->next->after
responses.append({
"json": {
"results": [{"id": f"{y}", "updatedAt": "2022-03-01T00:00:00Z"} for y in range(100)],
"paging": {}
},
"status_code": 200,
})

properties_response = [{
"json": [
{"name": property_name, "type": "string", "updatedAt": 1571085954360, "createdAt": 1565059306048}
for property_name in fake_properties_list
],
"status_code": 200
}]

# Create test_stream instance with some state
test_stream = Companies(**common_params)
test_stream.state = {"updatedAt": "2022-02-24T16:43:11Z"}

# Mocking Request
requests_mock.register_uri("POST", test_stream.url, responses)
requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response)
records = list(test_stream.read_records(sync_mode=SyncMode.incremental))
# The stream should not attempt to get more than 10K records.
# Instead, it should use the new state to start a new search query.
assert len(records) == 11000
assert test_stream.state['updatedAt'] == '2022-03-01T00:00:00+00:00'

0 comments on commit 710543a

Please sign in to comment.