Skip to content

Commit

Permalink
Fixed stream
Browse files Browse the repository at this point in the history
  • Loading branch information
lgomezm committed Mar 11, 2022
1 parent 1bf3c22 commit 46bb1ab
Showing 1 changed file with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1014,13 +1014,62 @@ def request_params(
return params


class DealStageHistoryStream(Stream):
"""Deal stage history, API v1
Deal stage history is exposed by the v1 API, but not the v3 API.
The v1 endpoint requires the contacts scope.
Docs: https://legacydocs.hubspot.com/docs/methods/deals/get-all-deals
"""

url = "/deals/v1/deal/paged"
more_key = "hasMore"
data_field = "deals"
updated_at_field = "timestamp"

def _transform(self, records: Iterable) -> Iterable:
for record in super()._transform(records):
dealstage = record.get("properties", {}).get("dealstage", {})
updated_at = dealstage.get(self.updated_at_field)
if updated_at:
yield {"id": record.get("dealId"), "dealstage": dealstage, self.updated_at_field: updated_at}

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
return {"propertiesWithHistory": "dealstage"}


class Deals(CRMSearchStream):
"""Deals, API v3"""

entity = "deal"
last_modified_field = "hs_lastmodifieddate"
associations = ["contacts", "companies"]

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._stage_history = DealStageHistoryStream(**kwargs)

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
history_by_id = {}
for record in self._stage_history.read_records(sync_mode):
if all(field in record for field in ("id", "dealstage")):
history_by_id[record["id"]] = record["dealstage"]

for record in super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state):
if record.get("id") and int(record["id"]) in history_by_id:
record["dealstage"] = history_by_id[int(record["id"])]
yield record


class DealPipelines(Stream):
"""Deal pipelines, API v1,
Expand Down

0 comments on commit 46bb1ab

Please sign in to comment.