Skip to content

Commit

Permalink
🎉 Source Hubspot: Support incremental sync on all possible streams (a…
Browse files Browse the repository at this point in the history
…irbytehq#8699)

* Add incremental sync support for companies, contact_lists, contacts, deals, line_items, products, quotes, tickets streams

* Updated PR number

* Fix formating

* Fix typo

* Commented unsupported tests

* Updated to review

* Updated abnormal state file

* Deleted comment

* Updated version in docker

* Updated version in spec yaml and seed
  • Loading branch information
lazebnyi authored and schlattk committed Jan 4, 2022
1 parent f5cb7bf commit 9524a9b
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "36c891d9-4bd9-43ac-bad2-10e12756272c",
"name": "HubSpot",
"dockerRepository": "airbyte/source-hubspot",
"dockerImageTag": "0.1.28",
"dockerImageTag": "0.1.29",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/hubspot",
"icon": "hubspot.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@
- name: HubSpot
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.1.28
dockerImageTag: 0.1.29
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2793,7 +2793,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-hubspot:0.1.28"
- dockerImage: "airbyte/source-hubspot:0.1.29"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.version=0.1.29
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ tests:
# and therefore the start date is set at 2021-10-10 for `config_oauth.json`,
# but the campaign was created on 2021-01-11
empty_streams: ["campaigns", "workflows"]
# incremental: fixme (eugene): '<=' not supported between instances of 'int' and 'str'
# See https://github.com/airbytehq/airbyte/issues/6509
# - config_path: "secrets/config.json"
# configured_catalog_path: "sample_files/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# subscription_changes: ["timestamp"]
# email_events: ["timestamp"]
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
cursor_paths:
subscription_changes: ["timestamp"]
email_events: ["timestamp"]
contact_lists: ["timestamp"]
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/full_refresh_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
{
"companies": {
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
},
"contact_lists": {
"timestamp": "2221-10-12T13:37:56.412000+00:00"
},
"contacts": {
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
},
"deals": {
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
},
"email_events": {
"timestamp": "2121-03-19T17:00:45.743000+00:00"
"timestamp": "2221-10-12T13:37:56.412000+00:00"
},
"line_items": {
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
},
"products": {
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
},
"quotes": {
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
},
"subscription_changes": {
"timestamp": "2121-03-19T16:58:54.301000+00:00"
"timestamp": "2221-10-12T13:37:56.412000+00:00"
},
"tickets": {
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,37 @@
"stream": {
"name": "companies",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "contact_lists",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "contacts",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand All @@ -49,10 +58,13 @@
"stream": {
"name": "deals",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand Down Expand Up @@ -88,10 +100,13 @@
"stream": {
"name": "line_items",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand All @@ -118,19 +133,25 @@
"stream": {
"name": "products",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "quotes",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand All @@ -148,10 +169,13 @@
"stream": {
"name": "tickets",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
{
"subscription_changes": {
"companies": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"contact_lists": {
"timestamp": "2021-02-23T00:00:00Z"
},
"contacts": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"deals": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"email_events": {
"timestamp": "2021-02-23T00:00:00Z"
},
"line_items": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"products": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"quotes": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"subscription_changes": {
"timestamp": "2021-02-23T00:00:00Z"
},
"tickets": {
"updatedAt": "2021-02-23T00:00:00Z"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ class IncrementalStream(Stream, ABC):

state_pk = "timestamp"
limit = 1000
# Flag which enable/disable chunked read in read_chunked method
# False -> chunk size is max (only one slice), True -> chunk_size is 30 days
need_chunk = True

@property
@abstractmethod
Expand All @@ -446,12 +449,15 @@ def updated_at_field(self):
def state(self) -> Optional[Mapping[str, Any]]:
"""Current state, if wasn't set return None"""
if self._state:
return {self.state_pk: str(self._state)}
return (
{self.state_pk: int(self._state.timestamp() * 1000)} if self.state_pk == "timestamp" else {self.state_pk: str(self._state)}
)
return None

@state.setter
def state(self, value):
self._state = pendulum.parse(value[self.state_pk])
state = value[self.state_pk]
self._state = pendulum.parse(str(pendulum.from_timestamp(state / 1000))) if isinstance(state, int) else pendulum.parse(state)
self._start_date = max(self._state, self._start_date)

def __init__(self, *args, **kwargs):
Expand All @@ -477,12 +483,13 @@ def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
self._start_date = self._state

def read_chunked(
self, getter: Callable, params: Mapping[str, Any] = None, chunk_size: pendulum.duration = pendulum.duration(days=1)
self, getter: Callable, params: Mapping[str, Any] = None, chunk_size: pendulum.duration = pendulum.duration(days=30)
) -> Iterator:
params = {**params} if params else {}
now_ts = int(pendulum.now().timestamp() * 1000)
start_ts = int(self._start_date.timestamp() * 1000)
chunk_size = int(chunk_size.total_seconds() * 1000)
max_delta = now_ts - start_ts
chunk_size = int(chunk_size.total_seconds() * 1000) if self.need_chunk else max_delta

for ts in range(start_ts, now_ts, chunk_size):
end_ts = ts + chunk_size
Expand Down Expand Up @@ -553,6 +560,12 @@ def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[Muta
yield record


class CRMObjectIncrementalStream(CRMObjectStream, IncrementalStream):
state_pk = "updatedAt"
limit = 100
need_chunk = False


class CampaignStream(Stream):
"""Email campaigns, API v1
There is some confusion between emails and campaigns in docs, this endpoint returns actual emails
Expand All @@ -571,7 +584,7 @@ def list(self, fields) -> Iterable:
yield {**row, **record}


class ContactListStream(Stream):
class ContactListStream(IncrementalStream):
"""Contact lists, API v1
Docs: https://legacydocs.hubspot.com/docs/methods/lists/get_lists
"""
Expand All @@ -582,6 +595,7 @@ class ContactListStream(Stream):
updated_at_field = "updatedAt"
created_at_field = "createdAt"
limit_field = "count"
need_chunk = False


class DealStageHistoryStream(Stream):
Expand All @@ -608,7 +622,7 @@ def list(self, fields) -> Iterable:
yield from self.read(partial(self._api.get, url=self.url), params)


class DealStream(CRMObjectStream):
class DealStream(CRMObjectIncrementalStream):
"""Deals, API v3"""

def __init__(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
API,
CampaignStream,
ContactListStream,
CRMObjectStream,
CRMObjectIncrementalStream,
DealPipelineStream,
DealStream,
EmailEventStream,
Expand All @@ -35,26 +35,26 @@ def __init__(self, start_date, credentials, **kwargs):
common_params = dict(api=self._api, start_date=self._start_date)
self._apis = {
"campaigns": CampaignStream(**common_params),
"companies": CRMObjectStream(entity="company", associations=["contacts"], **common_params),
"companies": CRMObjectIncrementalStream(entity="company", associations=["contacts"], **common_params),
"contact_lists": ContactListStream(**common_params),
"contacts": CRMObjectStream(entity="contact", **common_params),
"contacts": CRMObjectIncrementalStream(entity="contact", **common_params),
"deal_pipelines": DealPipelineStream(**common_params),
"deals": DealStream(associations=["contacts"], **common_params),
"email_events": EmailEventStream(**common_params),
"engagements": EngagementStream(**common_params),
"forms": FormStream(**common_params),
"line_items": CRMObjectStream(entity="line_item", **common_params),
"line_items": CRMObjectIncrementalStream(entity="line_item", **common_params),
"marketing_emails": MarketingEmailStream(**common_params),
"owners": OwnerStream(**common_params),
"products": CRMObjectStream(entity="product", **common_params),
"products": CRMObjectIncrementalStream(entity="product", **common_params),
"subscription_changes": SubscriptionChangeStream(**common_params),
"tickets": CRMObjectStream(entity="ticket", **common_params),
"tickets": CRMObjectIncrementalStream(entity="ticket", **common_params),
"workflows": WorkflowStream(**common_params),
}

credentials_title = credentials.get("credentials_title")
if credentials_title == "API Key Credentials":
self._apis["quotes"] = CRMObjectStream(entity="quote", **common_params)
self._apis["quotes"] = CRMObjectIncrementalStream(entity="quote", **common_params)

super().__init__(**kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,12 @@
}
},
"createdAt": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
},
"updatedAt": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
},
"archived": {
"type": ["null", "boolean"]
Expand Down
Loading

0 comments on commit 9524a9b

Please sign in to comment.