Skip to content

Commit

Permalink
Source Hubspot: implement new stream to read associations in incremen…
Browse files Browse the repository at this point in the history
…tal mode (#15099)

* #359 oncall - source hubspot: implement new stream to read associations in incremental mode

* #359 source hubspot: upd changelog

* #359 source hubspot: do not pass identifiers

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Jul 28, 2022
1 parent 64572d9 commit dd109de
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@
- name: HubSpot
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.1.77
dockerImageTag: 0.1.78
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3705,7 +3705,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-hubspot:0.1.77"
- dockerImage: "airbyte/source-hubspot:0.1.78"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.77
LABEL io.airbyte.version=0.1.78
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json

import pytest


@pytest.fixture(scope="session", name="config")
def config_fixture():
with open("secrets/config.json", "r") as config_file:
return json.load(config_file)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging

import pytest
from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type
from source_hubspot.source import SourceHubspot


@pytest.fixture
def source():
return SourceHubspot()


@pytest.fixture
def associations(config, source):
streams = source.streams(config)
return {stream.name: getattr(stream, "associations", []) for stream in streams}


@pytest.fixture
def configured_catalog(config, source):
streams = source.streams(config)
return {
"streams": [
{
"stream": stream.as_airbyte_stream(),
"sync_mode": "incremental",
"cursor_field": [stream.cursor_field],
"destination_sync_mode": "append",
}
for stream in streams
if stream.supports_incremental and getattr(stream, "associations", [])
]
}


def test_incremental_read_fetches_associations(config, configured_catalog, source, associations):
messages = source.read(logging.getLogger("airbyte"), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog), {})

association_found = False
for message in messages:
if message and message.type != Type.RECORD:
continue
record = message.record
stream, data = record.stream, record.data
# assume at least one association id is present
stream_associations = associations[stream]
for association in stream_associations:
if data.get(association):
association_found = True
break
assert association_found
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_api(config: Mapping[str, Any]) -> API:
return API(credentials=credentials)

def get_common_params(self, config) -> Mapping[str, Any]:
start_date = config.get("start_date")
start_date = config["start_date"]
credentials = config["credentials"]
api = self.get_api(config=config)
common_params = dict(api=api, start_date=start_date, credentials=credentials)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,15 @@ def _property_wrapper(self) -> IURLPropertyRepresentation:
return APIv1Property(properties)
return APIv3Property(properties)

def __init__(self, api: API, start_date: str = None, credentials: Mapping[str, Any] = None, **kwargs):
def __init__(self, api: API, start_date: Union[str, pendulum.datetime], credentials: Mapping[str, Any] = None, **kwargs):
super().__init__(**kwargs)
self._api: API = api
self._start_date = pendulum.parse(start_date)
self._credentials = credentials

if credentials["credentials_title"] == API_KEY_CREDENTIALS:
self._start_date = start_date
if isinstance(self._start_date, str):
self._start_date = pendulum.parse(self._start_date)
if self._credentials["credentials_title"] == API_KEY_CREDENTIALS:
self._session.params["hapikey"] = credentials.get("api_key")

def backoff_time(self, response: requests.Response) -> Optional[float]:
Expand Down Expand Up @@ -642,6 +645,51 @@ def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[Muta
yield record


class AssociationsStream(Stream):
"""
Designed to read associations of CRM objects during incremental syncs, since Search API does not support
retrieving associations.
"""

http_method = "POST"
filter_old_records = False

def __init__(self, parent_stream: Stream, identifiers: Iterable[Union[int, str]], *args, **kwargs):
super().__init__(*args, **kwargs)
self.parent_stream = parent_stream
self.identifiers = identifiers

@property
def url(self):
"""
although it is not used, it needs to be implemented because it is an abstract property
"""
return ""

def path(
self,
*,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
return f"/crm/v4/associations/{self.parent_stream.entity}/{stream_slice}/batch/read"

def scopes(self) -> Set[str]:
return self.parent_stream.scopes

def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[str]:
return self.parent_stream.associations

def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
return {"inputs": [{"id": str(id_)} for id_ in self.identifiers]}


class IncrementalStream(Stream, ABC):
"""Stream that supports state and incremental read"""

Expand Down Expand Up @@ -807,6 +855,24 @@ def _process_search(

return list(stream_records.values()), raw_response

def _read_associations(self, records: Iterable) -> Iterable[Mapping[str, Any]]:
records_by_pk = {record[self.primary_key]: record for record in records}
identifiers = list(map(lambda x: x[self.primary_key], records))
associations_stream = AssociationsStream(
api=self._api, start_date=self._start_date, credentials=self._credentials, parent_stream=self, identifiers=identifiers
)
slices = associations_stream.stream_slices(sync_mode=SyncMode.full_refresh)

for _slice in slices:
logger.debug(f"Reading {_slice} associations of {self.entity}")
associations = associations_stream.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for group in associations:
current_record = records_by_pk[group["from"]["id"]]
associations_list = current_record.get(_slice, [])
associations_list.extend(association["toObjectId"] for association in group["to"])
current_record[_slice] = associations_list
return records_by_pk.values()

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -826,15 +892,15 @@ def read_records(
stream_state=stream_state,
stream_slice=stream_slice,
)

records = self._read_associations(records)
else:
records, raw_response = self._read_stream_records(
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
)
records = self._flat_associations(records)
records = self._filter_old_records(records)
records = self._flat_associations(records)

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_check_connection_empty_config(config):
def test_check_connection_invalid_config(config):
config.pop("start_date")

with pytest.raises(TypeError):
with pytest.raises(KeyError):
SourceHubspot().check_connection(logger, config=config)


Expand Down Expand Up @@ -406,6 +406,8 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
requests_mock.register_uri("POST", test_stream.url, responses)
test_stream._sync_mode = None
requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response)
requests_mock.register_uri("POST", "/crm/v4/associations/company/contacts/batch/read", [{"status_code": 200, "json": {"results": []}}])

records, _ = read_incremental(test_stream, {})
# The stream should not attempt to get more than 10K records.
# Instead, it should use the new state to start a new search query.
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Now that you have set up the HubSpot source connector, check out the following H

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------|
| 0.1.78 | 2022-07-28 | [15099](https://github.com/airbytehq/airbyte/pull/15099) | Fix to fetch associations when using incremental mode |
| 0.1.77 | 2022-07-26 | [15035](https://github.com/airbytehq/airbyte/pull/15035) | Make PropertyHistory stream read historic data not limited to 30 days |
| 0.1.76 | 2022-07-25 | [14999](https://github.com/airbytehq/airbyte/pull/14999) | Partially revert changes made in v0.1.75 |
| 0.1.75 | 2022-07-18 | [14744](https://github.com/airbytehq/airbyte/pull/14744) | Remove override of private CDK method |
Expand Down

0 comments on commit dd109de

Please sign in to comment.