Skip to content

Commit

Permalink
Source Hubspot: do not override _read_incremental (airbytehq#14744)
Browse files Browse the repository at this point in the history
* airbytehq#14034 source hubspot: do not override _read_incremental

* airbytehq#14034 source hubspot: upd changelog

* airbytehq#14034 source hubspot: flake fix

* airbytehq#14034 source hubspot: update state after each record and fix incremental reads

* airbytehq#14034: fix formatting

* airbytehq#14034 source hubspot: rm unnecessary check

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Jul 18, 2022
1 parent 92a45d9 commit ae0cf4c
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@
- name: HubSpot
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.1.74
dockerImageTag: 0.1.75
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3693,7 +3693,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-hubspot:0.1.74"
- dockerImage: "airbyte/source-hubspot:0.1.75"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.74
LABEL io.airbyte.version=0.1.75
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.deprecated.base_source import ConfiguredAirbyteStream
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.sources.utils.schema_helpers import split_config
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests import HTTPError
Expand Down Expand Up @@ -187,26 +186,3 @@ def read(
logger.info(timer.report())

logger.info(f"Finished syncing {self.name}")

def _read_incremental(
self,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
"""
This method is overridden to checkpoint the latest actual state,
because stream state is refreshed after reading each batch of records (if need_chunk is True),
or reading all records in the stream.
"""
yield from super()._read_incremental(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
connector_state=connector_state,
internal_config=internal_config,
)
stream_state = stream_instance.get_updated_state(current_stream_state={}, latest_record={})
yield self._checkpoint_state(stream_instance, stream_state, connector_state)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import requests
from airbyte_cdk.entrypoint import logger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import IncrementalMixin
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
Expand Down Expand Up @@ -642,7 +643,7 @@ def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[Muta
yield record


class IncrementalStream(Stream, ABC):
class IncrementalStream(Stream, IncrementalMixin):
"""Stream that supports state and incremental read"""

state_pk = "timestamp"
Expand All @@ -656,6 +657,10 @@ class IncrementalStream(Stream, ABC):
def cursor_field(self) -> Union[str, List[str]]:
return self.updated_at_field

@property
def is_incremental_sync(self):
return self._sync_mode == SyncMode.incremental

@property
@abstractmethod
def updated_at_field(self):
Expand All @@ -669,12 +674,10 @@ def read_records(
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
records = super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
latest_cursor = None
for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
self._update_state(latest_cursor=cursor)
yield record
self._update_state(latest_cursor=latest_cursor)

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
return self.state
Expand Down Expand Up @@ -712,7 +715,6 @@ def _update_state(self, latest_cursor):
if new_state != self._state:
logger.info(f"Advancing bookmark for {self.name} stream from {self._state} to {latest_cursor}")
self._state = new_state
self._start_date = self._state

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
Expand Down Expand Up @@ -758,7 +760,7 @@ class CRMSearchStream(IncrementalStream, ABC):

@property
def url(self):
return f"/crm/v3/objects/{self.entity}/search" if self.state else f"/crm/v3/objects/{self.entity}"
return f"/crm/v3/objects/{self.entity}/search" if self.is_incremental_sync else f"/crm/v3/objects/{self.entity}"

def __init__(
self,
Expand All @@ -782,6 +784,7 @@ def search(

def _process_search(
self,
since: int,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
Expand All @@ -790,12 +793,12 @@ def _process_search(
properties_list = list(self.properties.keys())
payload = (
{
"filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}],
"filters": [{"value": since, "propertyName": self.last_modified_field, "operator": "GTE"}],
"sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}],
"properties": properties_list,
"limit": 100,
}
if self.state
if self.is_incremental_sync
else {}
)
if next_page_token:
Expand All @@ -818,10 +821,12 @@ def read_records(
pagination_complete = False
next_page_token = None

latest_cursor = None
# state is updated frequently, we need it frozen
since_ts = int(self._state.timestamp() * 1000) if self._state else None
while not pagination_complete:
if self.state:
if self.is_incremental_sync:
records, raw_response = self._process_search(
since=since_ts,
next_page_token=next_page_token,
stream_state=stream_state,
stream_slice=stream_slice,
Expand All @@ -838,21 +843,19 @@ def read_records(

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
self._update_state(latest_cursor=cursor)
yield record

next_page_token = self.next_page_token(raw_response)
if not next_page_token:
pagination_complete = True
elif self.state and next_page_token["payload"]["after"] >= 10000:
elif self.is_incremental_sync and next_page_token["payload"]["after"] >= 10000:
# Hubspot documentation states that the search endpoints are limited to 10,000 total results
# for any given query. Attempting to page beyond 10,000 will result in a 400 error.
# https://developers.hubspot.com/docs/api/crm/search. We stop getting data at 10,000 and
# start a new search query with the latest state that has been collected.
self._update_state(latest_cursor=latest_cursor)
next_page_token = None

self._update_state(latest_cursor=latest_cursor)
# Always return an empty generator just in case no records were ever yielded
yield from []

Expand Down Expand Up @@ -1121,7 +1124,7 @@ class Engagements(IncrementalStream):

@property
def url(self):
if self.state:
if self.is_incremental_sync:
return "/engagements/v1/engagements/recent/modified"
return "/engagements/v1/engagements/paged"

Expand All @@ -1137,15 +1140,15 @@ def request_params(
params = {"count": 250}
if next_page_token:
params["offset"] = next_page_token["offset"]
if self.state:
params.update({"since": int(self._state.timestamp() * 1000), "count": 100})
if self.is_incremental_sync:
params.update({"since": stream_slice, "count": 100})
return params

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
) -> Iterable[int]:
self.set_sync(sync_mode)
return [None]
return [int((self._state or self._start_date).timestamp() * 1000)]

def read_records(
self,
Expand All @@ -1158,7 +1161,6 @@ def read_records(
pagination_complete = False

next_page_token = None
latest_cursor = None

while not pagination_complete:
response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token)
Expand All @@ -1169,11 +1171,11 @@ def read_records(

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
self._update_state(latest_cursor=cursor)
yield record

next_page_token = self.next_page_token(response)
if self.state and next_page_token and next_page_token["offset"] >= 10000:
if self.is_incremental_sync and next_page_token and next_page_token["offset"] >= 10000:
# As per Hubspot documentation, the recent engagements endpoint will only return the 10K
# most recently updated engagements. Since they are returned sorted by `lastUpdated` in
# descending order, we stop getting records if we have already reached 10,000. Attempting
Expand All @@ -1187,8 +1189,6 @@ def read_records(
# Always return an empty generator just in case no records were ever yielded
yield from []

self._update_state(latest_cursor=latest_cursor)


class Forms(Stream):
"""Marketing Forms, API v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,12 @@ def time_sleep_mock(mocker):
def test_updated_at_field_non_exist_handler(requests_mock, common_params, fake_properties_list):
stream = ContactLists(**common_params)

created_at = "2022-03-25T16:43:11Z"
responses = [
{
"json": {
stream.data_field: [
{
"id": "test_id",
"createdAt": "2022-03-25T16:43:11Z",
},
{"id": "test_id", "createdAt": created_at},
],
}
}
Expand All @@ -70,7 +68,7 @@ def test_updated_at_field_non_exist_handler(requests_mock, common_params, fake_p

_, stream_state = read_incremental(stream, {})

expected = int(pendulum.parse(common_params["start_date"]).timestamp() * 1000)
expected = int(pendulum.parse(created_at).timestamp() * 1000)

assert stream_state[stream.updated_at_field] == expected

Expand Down
Loading

0 comments on commit ae0cf4c

Please sign in to comment.