Skip to content

Commit

Permalink
Fixed rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
lgomezm committed Mar 11, 2022
1 parent 46bb1ab commit 159c313
Showing 1 changed file with 5 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,6 @@ def _process_search(
payload = (
{
"filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}],
"sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}],
"properties": properties_list,
"limit": 100,
}
Expand Down Expand Up @@ -845,17 +844,6 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
payload = {}

if "paging" in response and "next" in response["paging"] and "after" in response["paging"]["next"]:
<<<<<<< HEAD
params["after"] = int(response["paging"]["next"]["after"])
payload["after"] = int(response["paging"]["next"]["after"])

return {"params": params, "payload": payload}

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
return [None]
=======
# Hubspot documentations states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000, so that
Expand All @@ -865,7 +853,11 @@ def stream_slices(
params["after"] = after
payload["after"] = after
return {"params": params, "payload": payload}
>>>>>>> 3ff68c120 (Handled search queries that would output more than 10K records)

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
return [None]


class CRMObjectStream(Stream):
Expand Down Expand Up @@ -1014,62 +1006,13 @@ def request_params(
return params


class DealStageHistoryStream(Stream):
"""Deal stage history, API v1
Deal stage history is exposed by the v1 API, but not the v3 API.
The v1 endpoint requires the contacts scope.
Docs: https://legacydocs.hubspot.com/docs/methods/deals/get-all-deals
"""

url = "/deals/v1/deal/paged"
more_key = "hasMore"
data_field = "deals"
updated_at_field = "timestamp"

def _transform(self, records: Iterable) -> Iterable:
for record in super()._transform(records):
dealstage = record.get("properties", {}).get("dealstage", {})
updated_at = dealstage.get(self.updated_at_field)
if updated_at:
yield {"id": record.get("dealId"), "dealstage": dealstage, self.updated_at_field: updated_at}

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
return {"propertiesWithHistory": "dealstage"}


class Deals(CRMSearchStream):
"""Deals, API v3"""

entity = "deal"
last_modified_field = "hs_lastmodifieddate"
associations = ["contacts", "companies"]

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._stage_history = DealStageHistoryStream(**kwargs)

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
history_by_id = {}
for record in self._stage_history.read_records(sync_mode):
if all(field in record for field in ("id", "dealstage")):
history_by_id[record["id"]] = record["dealstage"]

for record in super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state):
if record.get("id") and int(record["id"]) in history_by_id:
record["dealstage"] = history_by_id[int(record["id"])]
yield record


class DealPipelines(Stream):
"""Deal pipelines, API v1,
Expand Down

0 comments on commit 159c313

Please sign in to comment.