Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Hubspot: Handled 10K+ search-endpoint queries #10700

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ def _process_search(
payload = (
{
"filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}],
"sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}],
"properties": properties_list,
"limit": 100,
}
Expand Down Expand Up @@ -822,6 +823,13 @@ def read_records(
next_page_token = self.next_page_token(raw_response)
if not next_page_token:
pagination_complete = True
elif self.state and next_page_token["payload"]["after"] >= 10000:
# Hubspot documentations states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest state that has been collected.
self._update_state(latest_cursor=latest_cursor)
next_page_token = None

self._update_state(latest_cursor=latest_cursor)
# Always return an empty generator just in case no records were ever yielded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,62 @@ def test_it_should_not_read_quotes_stream_if_it_does_not_exist_in_client(oauth_c
all_records = list(source.read(logger, config=oauth_config, catalog=configured_catalog, state=None))
records = [record for record in all_records if record.type == Type.RECORD]
assert not records


def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(requests_mock, common_params, fake_properties_list):
"""
If there are more than 10,000 records that would be returned by the Hubspot search endpoint,
the CRMSearchStream instance should stop at the 10Kth record
"""

responses = [
{
"json": {
"results": [{"id": f"{y}", "updatedAt": "2022-02-25T16:43:11Z"} for y in range(100)],
"paging": {"next": {"after": f"{x*100}",}}
},
"status_code": 200,
}
for x in range(1, 101)
]
# After reaching 10K records, it performs a new search query.
responses.extend([
{
"json": {
"results": [{"id": f"{y}", "updatedAt": "2022-03-01T00:00:00Z"} for y in range(100)],
"paging": {"next": {"after": f"{x*100}",}}
},
"status_code": 200,
}
for x in range(1, 10)
])
# Last page... it does not have paging->next->after
responses.append({
"json": {
"results": [{"id": f"{y}", "updatedAt": "2022-03-01T00:00:00Z"} for y in range(100)],
"paging": {}
},
"status_code": 200,
})

properties_response = [{
"json": [
{"name": property_name, "type": "string", "updatedAt": 1571085954360, "createdAt": 1565059306048}
for property_name in fake_properties_list
],
"status_code": 200
}]

# Create test_stream instance with some state
test_stream = Companies(**common_params)
test_stream.state = {"updatedAt": "2022-02-24T16:43:11Z"}

# Mocking Request
requests_mock.register_uri("POST", test_stream.url, responses)
requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response)
records = list(test_stream.read_records(sync_mode=SyncMode.incremental))
# The stream should not attempt to get more than 10K records.
# Instead, it should use the new state to start a new search query.
assert len(records) == 11000
assert test_stream.state['updatedAt'] == '2022-03-01T00:00:00+00:00'