Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Hubspot: Support incremental sync on all possible streams #8699

Merged
merged 12 commits into from
Dec 17, 2021
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.26
LABEL io.airbyte.version=0.1.27
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ tests:
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/invalid_config_oauth.json"
status: "exception"
- config_path: "integration_tests/invalid_config_wrong_title.json"
status: "exception"
# Commented two tests cause validation return 'fatal' status for both tests.
# acceptance-test-config didn't support 'fatal' status.
# - config_path: "integration_tests/invalid_config_oauth.json"
# status: "exception"
# - config_path: "integration_tests/invalid_config_wrong_title.json"
# status: "exception"
discovery:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
- config_path: "secrets/config.json"
basic_read:
Expand All @@ -26,14 +28,14 @@ tests:
# and therefore the start date is set at 2021-10-10 for `config_oauth.json`,
# but the campaign was created on 2021-01-11
empty_streams: ["campaigns", "workflows"]
# incremental: fixme (eugene): '<=' not supported between instances of 'int' and 'str'
# See https://github.com/airbytehq/airbyte/issues/6509
# - config_path: "secrets/config.json"
# configured_catalog_path: "sample_files/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# subscription_changes: ["timestamp"]
# email_events: ["timestamp"]
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
cursor_paths:
subscription_changes: ["timestamp"]
email_events: ["timestamp"]
contact_lists: ["timestamp"]
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/full_refresh_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
{
"companies": {
"updatedAt": "2221-02-23T00:00:00Z"
},
"contact_lists": {
"timestamp": 4770964127000
},
"contacts": {
"updatedAt": "2221-02-23T00:00:00Z"
},
"deals": {
"updatedAt": "2221-02-23T00:00:00Z"
},
"email_events": {
"timestamp": "2121-03-19T17:00:45.743000+00:00"
"timestamp": 4770964127000
},
"line_items": {
"updatedAt": "2221-02-23T00:00:00Z"
},
"products": {
"updatedAt": "2221-02-23T00:00:00Z"
},
"quotes": {
"updatedAt": "2221-02-23T00:00:00Z"
},
"subscription_changes": {
"timestamp": "2121-03-19T16:58:54.301000+00:00"
"timestamp": 4770964127000
},
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
"tickets": {
"updatedAt": "2221-02-23T00:00:00Z"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,37 @@
"stream": {
"name": "companies",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "contact_lists",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "contacts",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand All @@ -49,10 +58,13 @@
"stream": {
"name": "deals",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand Down Expand Up @@ -88,10 +100,13 @@
"stream": {
"name": "line_items",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand All @@ -118,19 +133,25 @@
"stream": {
"name": "products",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "quotes",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand All @@ -148,10 +169,13 @@
"stream": {
"name": "tickets",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updatedAt"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"cursor_field": ["updatedAt"],
"destination_sync_mode": "append"
},
{
"stream": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
{
"subscription_changes": {
"timestamp": "2021-02-23T00:00:00Z"
"companies": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"contact_lists": {
"updatedAt": 1639058042
},
"contacts": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"deals": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"email_events": {
"timestamp": "2021-02-23T00:00:00Z"
"timestamp": 1639058042
},
"line_items": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"products": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"quotes": {
"updatedAt": "2021-02-23T00:00:00Z"
},
"subscription_changes": {
"timestamp": 1639058042
},
"tickets": {
"updatedAt": "2021-02-23T00:00:00Z"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class IncrementalStream(Stream, ABC):

state_pk = "timestamp"
limit = 1000
need_chunk = True

lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
@property
@abstractmethod
Expand All @@ -446,12 +447,15 @@ def updated_at_field(self):
def state(self) -> Optional[Mapping[str, Any]]:
"""Current state, if wasn't set return None"""
if self._state:
return {self.state_pk: str(self._state)}
return (
{self.state_pk: int(self._state.timestamp() * 1000)} if self.state_pk == "timestamp" else {self.state_pk: str(self._state)}
)
return None

@state.setter
def state(self, value):
self._state = pendulum.parse(value[self.state_pk])
state = value[self.state_pk]
self._state = pendulum.parse(str(pendulum.from_timestamp(state / 1000))) if isinstance(state, int) else pendulum.parse(state)
self._start_date = max(self._state, self._start_date)

def __init__(self, *args, **kwargs):
Expand All @@ -477,12 +481,13 @@ def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
self._start_date = self._state

def read_chunked(
self, getter: Callable, params: Mapping[str, Any] = None, chunk_size: pendulum.duration = pendulum.duration(days=1)
self, getter: Callable, params: Mapping[str, Any] = None, chunk_size: pendulum.duration = pendulum.duration(days=30)
) -> Iterator:
params = {**params} if params else {}
now_ts = int(pendulum.now().timestamp() * 1000)
start_ts = int(self._start_date.timestamp() * 1000)
chunk_size = int(chunk_size.total_seconds() * 1000)
max_delta = now_ts - start_ts
chunk_size = int(chunk_size.total_seconds() * 1000) if self.need_chunk else max_delta

for ts in range(start_ts, now_ts, chunk_size):
end_ts = ts + chunk_size
Expand Down Expand Up @@ -553,6 +558,12 @@ def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[Muta
yield record


class CRMObjectIncrementalStream(CRMObjectStream, IncrementalStream):
state_pk = "updatedAt"
limit = 100
need_chunk = False


class CampaignStream(Stream):
"""Email campaigns, API v1
There is some confusion between emails and campaigns in docs, this endpoint returns actual emails
Expand All @@ -571,7 +582,7 @@ def list(self, fields) -> Iterable:
yield {**row, **record}


class ContactListStream(Stream):
class ContactListStream(IncrementalStream):
"""Contact lists, API v1
Docs: https://legacydocs.hubspot.com/docs/methods/lists/get_lists
"""
Expand All @@ -582,6 +593,7 @@ class ContactListStream(Stream):
updated_at_field = "updatedAt"
created_at_field = "createdAt"
limit_field = "count"
need_chunk = False


class DealStageHistoryStream(Stream):
Expand All @@ -608,7 +620,7 @@ def list(self, fields) -> Iterable:
yield from self.read(partial(self._api.get, url=self.url), params)


class DealStream(CRMObjectStream):
class DealStream(CRMObjectIncrementalStream):
"""Deals, API v3"""

def __init__(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
API,
CampaignStream,
ContactListStream,
CRMObjectStream,
CRMObjectIncrementalStream,
DealPipelineStream,
DealStream,
EmailEventStream,
Expand All @@ -35,26 +35,26 @@ def __init__(self, start_date, credentials, **kwargs):
common_params = dict(api=self._api, start_date=self._start_date)
self._apis = {
"campaigns": CampaignStream(**common_params),
"companies": CRMObjectStream(entity="company", associations=["contacts"], **common_params),
"companies": CRMObjectIncrementalStream(entity="company", associations=["contacts"], **common_params),
"contact_lists": ContactListStream(**common_params),
"contacts": CRMObjectStream(entity="contact", **common_params),
"contacts": CRMObjectIncrementalStream(entity="contact", **common_params),
"deal_pipelines": DealPipelineStream(**common_params),
"deals": DealStream(associations=["contacts"], **common_params),
"email_events": EmailEventStream(**common_params),
"engagements": EngagementStream(**common_params),
"forms": FormStream(**common_params),
"line_items": CRMObjectStream(entity="line_item", **common_params),
"line_items": CRMObjectIncrementalStream(entity="line_item", **common_params),
"marketing_emails": MarketingEmailStream(**common_params),
"owners": OwnerStream(**common_params),
"products": CRMObjectStream(entity="product", **common_params),
"products": CRMObjectIncrementalStream(entity="product", **common_params),
"subscription_changes": SubscriptionChangeStream(**common_params),
"tickets": CRMObjectStream(entity="ticket", **common_params),
"tickets": CRMObjectIncrementalStream(entity="ticket", **common_params),
"workflows": WorkflowStream(**common_params),
}

credentials_title = credentials.get("credentials_title")
if credentials_title == "API Key Credentials":
self._apis["quotes"] = CRMObjectStream(entity="quote", **common_params)
self._apis["quotes"] = CRMObjectIncrementalStream(entity="quote", **common_params)

super().__init__(**kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,12 @@
}
},
"createdAt": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
},
"updatedAt": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
},
"archived": {
"type": ["null", "boolean"]
Expand Down
Loading