Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Hubspot: best practices #2537

Merged
merged 4 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,30 @@
SOFTWARE.
"""

# format anchor
import pytest
from source_hubspot.client import Client
from source_hubspot.errors import HubspotInvalidAuth


def test_example_method():
assert True
@pytest.fixture(name="wrong_credentials")
def wrong_credentials_fixture():
return {"api_key": "wrong_key"}


def test__health_check_with_wrong_token(wrong_credentials):
client = Client(start_date="2021-02-01T00:00:00Z", credentials=wrong_credentials)
alive, error = client.health_check()

assert not alive
assert (
error
== "HubspotInvalidAuth('The API key provided is invalid. View or manage your API key here: https://app.hubspot.com/l/api-key/')"
)


def test__stream_iterator_with_wrong_token(wrong_credentials):
client = Client(start_date="2021-02-01T00:00:00Z", credentials=wrong_credentials)
with pytest.raises(
HubspotInvalidAuth, match="The API key provided is invalid. View or manage your API key here: https://app.hubspot.com/l/api-key/"
):
_ = list(client.streams)
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@

import json
from pathlib import Path
from typing import List, Tuple, Any, MutableMapping, Mapping, Iterable
from typing import Any, Iterable, List, Mapping, MutableMapping, Tuple

from airbyte_protocol import ConfiguredAirbyteCatalog, Type, SyncMode
import pytest
from airbyte_protocol import ConfiguredAirbyteCatalog, SyncMode, Type
from base_python import AirbyteLogger
from source_hubspot.source import SourceHubspot
import pytest


HERE = Path(__file__).parent.absolute()


@pytest.fixture(scope="session")
def config() -> Mapping[str, Any]:
@pytest.fixture(scope="session", name="config")
def config_fixture() -> Mapping[str, Any]:
config_filename = HERE.parent / "secrets" / "config.json"

if not config_filename.exists():
Expand Down Expand Up @@ -67,35 +66,42 @@ def configured_catalog_with_incremental(configured_catalog) -> ConfiguredAirbyte
return configured_catalog


def read_stream(source: SourceHubspot, config: Mapping, catalog: ConfiguredAirbyteCatalog, state: MutableMapping = None) -> Tuple[List, List]:
records = []
def read_stream(
source: SourceHubspot, config: Mapping, catalog: ConfiguredAirbyteCatalog, state: MutableMapping = None
) -> Tuple[Mapping, List]:
records = {}
states = []
for message in source.read(AirbyteLogger(), config, catalog, state):
if message.type == Type.RECORD:
records.append(message.record)
records.setdefault(message.record.stream, [])
records[message.record.stream].append(message.record)
elif message.type == Type.STATE:
states.append(message.state)
print(message.state.data)

return records, states


def records_older(records: Iterable, than: int) -> Iterable:
def records_older(records: Iterable, than: int, cursor_field: str) -> Iterable:
for record in records:
if record.data["created"] < than:
if record.data.get(cursor_field) < than:
yield record


class TestIncrementalSync:
def test_sync_with_latest_state(self, config, configured_catalog_with_incremental):
"""Sync first time, save the state and sync second time with saved state from previous sync"""
streams = {stream.stream.name: stream for stream in configured_catalog_with_incremental.streams}
records1, states1 = read_stream(SourceHubspot(), config, configured_catalog_with_incremental)

assert states1, "should have at least one state emitted"
assert records1, "should have at lest few records emitted"
assert records1, "should have at least few records emitted"

records2, states2 = read_stream(SourceHubspot(), config, configured_catalog_with_incremental, states1[-1].data)

assert states1 == states2
assert list(records_older(records1, than=records2[0].data["created"])), "should have older records from the first read"
assert not list(records_older(records2, than=records2[0].data["created"])), "should not have older records from the second read"
assert states1[-1] == states2[-1], "final states should be the same"
for stream_name, state in states2[-1].data.items():
cursor_field = streams[stream_name].cursor_field[0]
old_records1 = records_older(records1[stream_name], than=records2[stream_name][0].data[cursor_field], cursor_field=cursor_field)
old_records2 = records_older(records2[stream_name], than=records2[stream_name][0].data[cursor_field], cursor_field=cursor_field)
assert list(old_records1), "should have older records from the first read"
assert not list(old_records2), "should not have older records from the second read"
Original file line number Diff line number Diff line change
Expand Up @@ -4115,10 +4115,10 @@
},
"supported_sync_modes": ["incremental", "full_refresh"],
"source_defined_cursor": true,
"default_cursor_field": null
"default_cursor_field": ["created"]
},
"sync_mode": "incremental",
"cursor_field": null
"cursor_field": ["created"]
},
{
"stream": {
Expand Down Expand Up @@ -5737,10 +5737,10 @@
},
"supported_sync_modes": ["incremental", "full_refresh"],
"source_defined_cursor": true,
"default_cursor_field": null
"default_cursor_field": ["timestamp"]
},
"sync_mode": "incremental",
"cursor_field": null
"cursor_field": ["timestamp"]
},
{
"stream": {
Expand Down
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-hubspot/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

TEST_REQUIREMENTS = [
"pytest",
"requests_mock==1.8.0",
]

setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import pendulum as pendulum
import requests
from base_python.entrypoint import logger
from source_hubspot.errors import HubspotInvalidAuth, HubspotRateLimited, HubspotSourceUnavailable
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout


def retry_connection_handler(**kwargs):
Expand All @@ -45,7 +45,7 @@ def log_retry_attempt(details):
logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} more seconds then retrying...")

def giveup_handler(exc):
if isinstance(exc, HubspotInvalidAuth):
if isinstance(exc, (HubspotInvalidAuth, HubspotAccessDenied)):
return True
return exc.response is not None and 400 <= exc.response.status_code < 500

Expand Down Expand Up @@ -146,17 +146,23 @@ def _add_auth(self, params: Mapping[str, Any] = None) -> Mapping[str, Any]:
@staticmethod
def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
"""Handle response"""
message = "Unknown error"
if response.headers.get("content-type") == "application/json;charset=utf-8" and response.status_code != 200:
message = response.json().get("message")

if response.status_code == 403:
raise HubspotSourceUnavailable(response.content)
raise HubspotAccessDenied(message, response=response)
elif response.status_code == 401:
raise HubspotInvalidAuth(response.content)
raise HubspotInvalidAuth(message, response=response)
elif response.status_code == 429:
retry_after = response.headers.get("Retry-After")
raise HubspotRateLimited(
f"429 Rate Limit Exceeded: API rate-limit has been reached until {retry_after} seconds."
" See https://developers.hubspot.com/docs/api/usage-details",
response=response,
)
elif response.status_code in (502, 504):
raise HubspotTimeout(message, response=response)
else:
response.raise_for_status()

Expand All @@ -177,7 +183,8 @@ class Stream(ABC):
"""Base class for all streams. Responsible for data fetching and pagination"""

entity: str = None
updated_at_fields: List[str] = []
updated_at_field: str = None
created_at_field: str = None

more_key: str = None
data_field = "results"
Expand Down Expand Up @@ -218,7 +225,11 @@ def _filter_dynamic_fields(self, records: Iterable) -> Iterable:
yield record

def _transform(self, records: Iterable) -> Iterable:
yield from records
"""Preprocess record before emitting"""
for record in records:
if self.created_at_field and self.updated_at_field and record.get(self.updated_at_field) is None:
record[self.updated_at_field] = record[self.created_at_field]
yield record

@staticmethod
def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime:
Expand All @@ -233,18 +244,13 @@ def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime:
def _filter_old_records(self, records: Iterable) -> Iterable:
"""Skip records that was updated before our start_date"""
for record in records:
updated_at = self._record_bookmark(record)
updated_at = record[self.updated_at_field]
if updated_at:
updated_at = self._field_to_datetime(updated_at)
if updated_at < self._start_date:
continue
yield record

def _record_bookmark(self, record: Mapping[str, Any]) -> Any:
for field_name in self.updated_at_fields:
if record.get(field_name) is not None:
return record.get(field_name)

def _read(self, getter: Callable, params: MutableMapping[str, Any] = None) -> Iterator:
while True:
response = getter(params=params)
Expand Down Expand Up @@ -306,8 +312,8 @@ class IncrementalStream(Stream, ABC):

@property
@abstractmethod
def updated_at_fields(self):
"""Name of the fields associated with the state"""
def updated_at_field(self):
"""Name of the field associated with the state"""

@property
def state(self) -> Optional[Mapping[str, Any]]:
Expand All @@ -328,12 +334,12 @@ def __init__(self, *args, **kwargs):
def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
"""Apply state filter to set of records, update cursor(state) if necessary in the end"""
latest_cursor = None
# to track state, there is no guarantee that returned records sorted in ascending order. Having exact
# boundary we could always ensure we don't miss records between states. In the future, if we would
# to track state, there is no guarantee that returned records sorted in ascending order. Having exact
# boundary we could always ensure we don't miss records between states. In the future, if we would
# like to save the state more often we can do this every batch
for record in self.read_chunked(getter, params):
yield record
cursor = self._field_to_datetime(self._record_bookmark(record))
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor

if latest_cursor:
Expand Down Expand Up @@ -372,7 +378,8 @@ class CRMObjectStream(Stream):

entity: Optional[str] = None
associations: List[str] = []
updated_at_fields = ["updatedAt", "createdAt"]
updated_at_field = "updatedAt"
created_at_field = "createdAt"

@property
def url(self):
Expand Down Expand Up @@ -427,7 +434,7 @@ class CampaignStream(Stream):
more_key = "hasMore"
data_field = "campaigns"
limit = 500
updated_at_field = ["lastUpdatedTime"]
updated_at_field = "lastUpdatedTime"

def list(self, fields) -> Iterable:
for row in self.read(getter=partial(self._api.get, url=self.url)):
Expand All @@ -443,7 +450,8 @@ class ContactListStream(Stream):
url = "/contacts/v1/lists"
data_field = "lists"
more_key = "has-more"
updated_at_fields = ["updatedAt", "createdAt"]
updated_at_field = "updatedAt"
created_at_field = "createdAt"
limit_field = "count"


Expand All @@ -454,7 +462,8 @@ class DealPipelineStream(Stream):
"""

url = "/crm-pipelines/v1/pipelines/deals"
updated_at_fields = ["updatedAt", "createdAt"]
updated_at_field = "updatedAt"
created_at_field = "createdAt"


class TicketPipelineStream(Stream):
Expand All @@ -464,7 +473,8 @@ class TicketPipelineStream(Stream):
"""

url = "/crm-pipelines/v1/pipelines/tickets"
updated_at_fields = ["updatedAt", "createdAt"]
updated_at_field = "updatedAt"
created_at_field = "createdAt"


class EmailEventStream(IncrementalStream):
Expand All @@ -475,7 +485,8 @@ class EmailEventStream(IncrementalStream):
url = "/email/public/v1/events"
data_field = "events"
more_key = "hasMore"
updated_at_fields = ["created"]
updated_at_field = "created"
created_at_field = "created"


class EngagementStream(Stream):
Expand All @@ -487,11 +498,11 @@ class EngagementStream(Stream):
url = "/engagements/v1/engagements/paged"
more_key = "hasMore"
limit = 250
updated_at_fields = ["lastUpdated", "createdAt"]
updated_at_field = "lastUpdated"
created_at_field = "createdAt"

def _transform(self, records: Iterable) -> Iterable:
for record in records:
yield {**record.pop("engagement"), **record}
yield from super()._transform({**record.pop("engagement"), **record} for record in records)


class FormStream(Stream):
Expand All @@ -502,7 +513,8 @@ class FormStream(Stream):

entity = "form"
url = "/forms/v2/forms"
updated_at_fields = ["updatedAt", "createdAt"]
updated_at_field = "updatedAt"
created_at_field = "createdAt"


class OwnerStream(Stream):
Expand All @@ -511,7 +523,8 @@ class OwnerStream(Stream):
"""

url = "/crm/v3/owners"
updated_at_fields = ["updatedAt", "createdAt"]
updated_at_field = "updatedAt"
created_at_field = "createdAt"


class SubscriptionChangeStream(IncrementalStream):
Expand All @@ -522,7 +535,7 @@ class SubscriptionChangeStream(IncrementalStream):
url = "/email/public/v1/subscriptions/timeline"
data_field = "timeline"
more_key = "hasMore"
updated_at_fields = ["timestamp"]
updated_at_field = "timestamp"


class WorkflowStream(Stream):
Expand All @@ -532,4 +545,5 @@ class WorkflowStream(Stream):

url = "/automation/v3/workflows"
data_field = "workflows"
updated_at_fields = ["updatedAt", "createdAt"]
updated_at_field = "updatedAt"
created_at_field = "insertedAt"
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def streams(self) -> Iterator[AirbyteStream]:
properties = self._apis[stream.name].properties
if properties:
stream.json_schema["properties"]["properties"] = {"type": "object", "properties": properties}
stream.default_cursor_field = [self._apis[stream.name].updated_at_field]
yield stream

def stream_has_state(self, name: str) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ class HubspotError(HTTPError):
"""


class HubspotSourceUnavailable(HubspotError):
""""""


class HubspotBadRequest(HubspotError):
"""Most 40X and 501 status codes"""
class HubspotTimeout(HubspotError):
"""502/504 HubSpot has processing limits in place to prevent a single client from causing degraded performance,
and these responses indicate that those limits have been hit. You'll normally only see these timeout responses
when making a large number of requests over a sustained period. If you get one of these responses,
you should pause your requests for a few seconds, then retry.
"""


class HubspotInvalidAuth(HubspotError):
Expand Down
Loading