From c9b2c0e7e4876e0dbbd697e07ce35ec69fb4844b Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Fri, 29 Jul 2022 19:47:36 +0300 Subject: [PATCH] Source Hubspot: revert v0.1.78 (#15144) * Revert "Source Hubspot: implement new stream to read associations in incremental mode (#15099)" This reverts commit dd109debec80156c977a66b30c3cf2dc9b808844. * #359 rollback * hubspot: upd changelog * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-hubspot/Dockerfile | 2 +- .../integration_tests/conftest.py | 13 ---- .../integration_tests/test_associations.py | 55 -------------- .../source-hubspot/source_hubspot/source.py | 2 +- .../source-hubspot/source_hubspot/streams.py | 76 ++----------------- .../source-hubspot/unit_tests/test_source.py | 4 +- docs/integrations/sources/hubspot.md | 1 + 9 files changed, 11 insertions(+), 146 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-hubspot/integration_tests/conftest.py delete mode 100644 airbyte-integrations/connectors/source-hubspot/integration_tests/test_associations.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index aa6dc8f1d970..733dd0bde7ef 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -414,7 +414,7 @@ - name: HubSpot sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c dockerRepository: airbyte/source-hubspot - dockerImageTag: 0.1.78 + dockerImageTag: 0.1.79 documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot icon: hubspot.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 51bf3e99f1ce..a5fb19fe6a7d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3705,7 +3705,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-hubspot:0.1.78" +- dockerImage: "airbyte/source-hubspot:0.1.79" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-hubspot/Dockerfile b/airbyte-integrations/connectors/source-hubspot/Dockerfile index 15a02eb013b2..7b1ecd78820a 100644 --- a/airbyte-integrations/connectors/source-hubspot/Dockerfile +++ b/airbyte-integrations/connectors/source-hubspot/Dockerfile @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.78 +LABEL io.airbyte.version=0.1.79 LABEL io.airbyte.name=airbyte/source-hubspot diff --git a/airbyte-integrations/connectors/source-hubspot/integration_tests/conftest.py b/airbyte-integrations/connectors/source-hubspot/integration_tests/conftest.py deleted file mode 100644 index c40a4656127d..000000000000 --- a/airbyte-integrations/connectors/source-hubspot/integration_tests/conftest.py +++ /dev/null @@ -1,13 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import json - -import pytest - - -@pytest.fixture(scope="session", name="config") -def config_fixture(): - with open("secrets/config.json", "r") as config_file: - return json.load(config_file) diff --git a/airbyte-integrations/connectors/source-hubspot/integration_tests/test_associations.py b/airbyte-integrations/connectors/source-hubspot/integration_tests/test_associations.py deleted file mode 100644 index 81621f3fcad0..000000000000 --- a/airbyte-integrations/connectors/source-hubspot/integration_tests/test_associations.py +++ /dev/null @@ -1,55 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import logging - -import pytest -from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type -from source_hubspot.source import SourceHubspot - - -@pytest.fixture -def source(): - return SourceHubspot() - - -@pytest.fixture -def associations(config, source): - streams = source.streams(config) - return {stream.name: getattr(stream, "associations", []) for stream in streams} - - -@pytest.fixture -def configured_catalog(config, source): - streams = source.streams(config) - return { - "streams": [ - { - "stream": stream.as_airbyte_stream(), - "sync_mode": "incremental", - "cursor_field": [stream.cursor_field], - "destination_sync_mode": "append", - } - for stream in streams - if stream.supports_incremental and getattr(stream, "associations", []) - ] - } - - -def test_incremental_read_fetches_associations(config, configured_catalog, source, associations): - messages = source.read(logging.getLogger("airbyte"), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog), {}) - - association_found = False - for message in messages: - if message and message.type != Type.RECORD: - continue - record = message.record - stream, data = record.stream, record.data - # assume at least one association id is present - stream_associations = associations[stream] - for association in stream_associations: - if data.get(association): - association_found = True - break - assert association_found diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py index 99f69ef7f61f..dff31ef8b0b4 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py @@ -82,7 +82,7 @@ def get_api(config: Mapping[str, Any]) -> API: return API(credentials=credentials) def get_common_params(self, config) -> Mapping[str, Any]: - start_date = config["start_date"] + start_date = config.get("start_date") credentials = config["credentials"] api = self.get_api(config=config) common_params = dict(api=api, start_date=start_date, credentials=credentials) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index edc3e15abca3..bcd5301571f2 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -245,15 +245,12 @@ def _property_wrapper(self) -> IURLPropertyRepresentation: return APIv1Property(properties) return APIv3Property(properties) - def __init__(self, api: API, start_date: Union[str, pendulum.datetime], credentials: Mapping[str, Any] = None, **kwargs): + def __init__(self, api: API, start_date: str = None, credentials: Mapping[str, Any] = None, **kwargs): super().__init__(**kwargs) self._api: API = api - self._credentials = credentials + self._start_date = pendulum.parse(start_date) - self._start_date = start_date - if isinstance(self._start_date, str): - self._start_date = pendulum.parse(self._start_date) - if self._credentials["credentials_title"] == API_KEY_CREDENTIALS: + if credentials["credentials_title"] == API_KEY_CREDENTIALS: self._session.params["hapikey"] = credentials.get("api_key") def backoff_time(self, response: requests.Response) -> Optional[float]: @@ -645,51 +642,6 @@ def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[Muta yield record -class AssociationsStream(Stream): - """ - Designed to read associations of CRM objects during incremental syncs, since Search API does not support - retrieving associations. - """ - - http_method = "POST" - filter_old_records = False - - def __init__(self, parent_stream: Stream, identifiers: Iterable[Union[int, str]], *args, **kwargs): - super().__init__(*args, **kwargs) - self.parent_stream = parent_stream - self.identifiers = identifiers - - @property - def url(self): - """ - although it is not used, it needs to be implemented because it is an abstract property - """ - return "" - - def path( - self, - *, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> str: - return f"/crm/v4/associations/{self.parent_stream.entity}/{stream_slice}/batch/read" - - def scopes(self) -> Set[str]: - return self.parent_stream.scopes - - def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[str]: - return self.parent_stream.associations - - def request_body_json( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Optional[Mapping]: - return {"inputs": [{"id": str(id_)} for id_ in self.identifiers]} - - class IncrementalStream(Stream, ABC): """Stream that supports state and incremental read""" @@ -855,24 +807,6 @@ def _process_search( return list(stream_records.values()), raw_response - def _read_associations(self, records: Iterable) -> Iterable[Mapping[str, Any]]: - records_by_pk = {record[self.primary_key]: record for record in records} - identifiers = list(map(lambda x: x[self.primary_key], records)) - associations_stream = AssociationsStream( - api=self._api, start_date=self._start_date, credentials=self._credentials, parent_stream=self, identifiers=identifiers - ) - slices = associations_stream.stream_slices(sync_mode=SyncMode.full_refresh) - - for _slice in slices: - logger.debug(f"Reading {_slice} associations of {self.entity}") - associations = associations_stream.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh) - for group in associations: - current_record = records_by_pk[group["from"]["id"]] - associations_list = current_record.get(_slice, []) - associations_list.extend(association["toObjectId"] for association in group["to"]) - current_record[_slice] = associations_list - return records_by_pk.values() - def read_records( self, sync_mode: SyncMode, @@ -892,15 +826,15 @@ def read_records( stream_state=stream_state, stream_slice=stream_slice, ) - records = self._read_associations(records) + else: records, raw_response = self._read_stream_records( stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token, ) - records = self._flat_associations(records) records = self._filter_old_records(records) + records = self._flat_associations(records) for record in records: cursor = self._field_to_datetime(record[self.updated_at_field]) diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py index a41048238b74..0a66137509c2 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py @@ -50,7 +50,7 @@ def test_check_connection_empty_config(config): def test_check_connection_invalid_config(config): config.pop("start_date") - with pytest.raises(KeyError): + with pytest.raises(TypeError): SourceHubspot().check_connection(logger, config=config) @@ -406,8 +406,6 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req requests_mock.register_uri("POST", test_stream.url, responses) test_stream._sync_mode = None requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response) - requests_mock.register_uri("POST", "/crm/v4/associations/company/contacts/batch/read", [{"status_code": 200, "json": {"results": []}}]) - records, _ = read_incremental(test_stream, {}) # The stream should not attempt to get more than 10K records. # Instead, it should use the new state to start a new search query. diff --git a/docs/integrations/sources/hubspot.md b/docs/integrations/sources/hubspot.md index cf970fce03d9..d3889fe5aeff 100644 --- a/docs/integrations/sources/hubspot.md +++ b/docs/integrations/sources/hubspot.md @@ -129,6 +129,7 @@ Now that you have set up the HubSpot source connector, check out the following H | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------| +| 0.1.79 | 2022-07-28 | [15144](https://github.com/airbytehq/airbyte/pull/15144) | Revert v0.1.78 due to permission issues | | 0.1.78 | 2022-07-28 | [15099](https://github.com/airbytehq/airbyte/pull/15099) | Fix to fetch associations when using incremental mode | | 0.1.77 | 2022-07-26 | [15035](https://github.com/airbytehq/airbyte/pull/15035) | Make PropertyHistory stream read historic data not limited to 30 days | | 0.1.76 | 2022-07-25 | [14999](https://github.com/airbytehq/airbyte/pull/14999) | Partially revert changes made in v0.1.75 |