diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index 6a14b8f6b1b4..ce5aec1f55c4 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -763,7 +763,6 @@ def _process_search( payload = ( { "filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}], - "sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}], "properties": properties_list, "limit": 100, } @@ -845,17 +844,6 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, payload = {} if "paging" in response and "next" in response["paging"] and "after" in response["paging"]["next"]: -<<<<<<< HEAD - params["after"] = int(response["paging"]["next"]["after"]) - payload["after"] = int(response["paging"]["next"]["after"]) - - return {"params": params, "payload": payload} - - def stream_slices( - self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None - ) -> Iterable[Optional[Mapping[str, Any]]]: - return [None] -======= # Hubspot documentations states that the search endpoints are limited to 10,000 total results # for any given query. Attempting to page beyond 10,000 will result in a 400 error. # https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000, so that @@ -865,7 +853,11 @@ def stream_slices( params["after"] = after payload["after"] = after return {"params": params, "payload": payload} ->>>>>>> 3ff68c120 (Handled search queries that would output more than 10K records) + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + return [None] class CRMObjectStream(Stream): @@ -1014,34 +1006,6 @@ def request_params( return params -class DealStageHistoryStream(Stream): - """Deal stage history, API v1 - Deal stage history is exposed by the v1 API, but not the v3 API. - The v1 endpoint requires the contacts scope. - Docs: https://legacydocs.hubspot.com/docs/methods/deals/get-all-deals - """ - - url = "/deals/v1/deal/paged" - more_key = "hasMore" - data_field = "deals" - updated_at_field = "timestamp" - - def _transform(self, records: Iterable) -> Iterable: - for record in super()._transform(records): - dealstage = record.get("properties", {}).get("dealstage", {}) - updated_at = dealstage.get(self.updated_at_field) - if updated_at: - yield {"id": record.get("dealId"), "dealstage": dealstage, self.updated_at_field: updated_at} - - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: - return {"propertiesWithHistory": "dealstage"} - - class Deals(CRMSearchStream): """Deals, API v3""" @@ -1049,27 +1013,6 @@ class Deals(CRMSearchStream): last_modified_field = "hs_lastmodifieddate" associations = ["contacts", "companies"] - def __init__(self, **kwargs): - super().__init__(**kwargs) - self._stage_history = DealStageHistoryStream(**kwargs) - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - history_by_id = {} - for record in self._stage_history.read_records(sync_mode): - if all(field in record for field in ("id", "dealstage")): - history_by_id[record["id"]] = record["dealstage"] - - for record in super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state): - if record.get("id") and int(record["id"]) in history_by_id: - record["dealstage"] = history_by_id[int(record["id"])] - yield record - class DealPipelines(Stream): """Deal pipelines, API v1,