Skip to content

Commit

Permalink
#379 source hubspot: fix 401 when reading associations
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed Aug 1, 2022
1 parent fc8816f commit 0fa38ef
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.78
LABEL io.airbyte.version=0.1.80
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@
def config_fixture():
with open("secrets/config.json", "r") as config_file:
return json.load(config_file)


@pytest.fixture(scope="session", name="oauth_config")
def oauth_config_fixture():
with open("secrets/config_oauth.json", "r") as config_file:
return json.load(config_file)
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ def configured_catalog(config, source):
}


def test_incremental_read_fetches_associations(config, configured_catalog, source, associations):
messages = source.read(logging.getLogger("airbyte"), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog), {})
@pytest.mark.parametrize("auth", ("api_key", "oauth"))
def test_incremental_read_fetches_associations(auth, config, oauth_config, configured_catalog, source, associations):
configuration = oauth_config if auth == "oauth" else config
messages = source.read(logging.getLogger("airbyte"), configuration, ConfiguredAirbyteCatalog.parse_obj(configured_catalog), {})

association_found = False
for message in messages:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests import HTTPError
from source_hubspot.constants import API_KEY_CREDENTIALS, OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS
from source_hubspot.constants import API_KEY_CREDENTIALS
from source_hubspot.streams import (
API,
Campaigns,
Expand Down Expand Up @@ -85,12 +85,7 @@ def get_common_params(self, config) -> Mapping[str, Any]:
start_date = config["start_date"]
credentials = config["credentials"]
api = self.get_api(config=config)
common_params = dict(api=api, start_date=start_date, credentials=credentials)

credentials_title = credentials.get("credentials_title")
if credentials_title == OAUTH_CREDENTIALS or credentials_title == PRIVATE_APP_CREDENTIALS:
common_params["authenticator"] = api.get_authenticator()
return common_params
return dict(api=api, start_date=start_date, credentials=credentials)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
credentials = config.get("credentials", {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,11 @@ def __init__(self, api: API, start_date: Union[str, pendulum.datetime], credenti
self._start_date = start_date
if isinstance(self._start_date, str):
self._start_date = pendulum.parse(self._start_date)
if self._credentials["credentials_title"] == API_KEY_CREDENTIALS:
creds_title = self._credentials["credentials_title"]
if creds_title == API_KEY_CREDENTIALS:
self._session.params["hapikey"] = credentials.get("api_key")
elif creds_title in (OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS):
self._authenticator = api.get_authenticator()

def backoff_time(self, response: requests.Response) -> Optional[float]:
if response.status_code == codes.too_many_requests:
Expand Down Expand Up @@ -864,7 +867,7 @@ def _read_associations(self, records: Iterable) -> Iterable[Mapping[str, Any]]:
slices = associations_stream.stream_slices(sync_mode=SyncMode.full_refresh)

for _slice in slices:
logger.debug(f"Reading {_slice} associations of {self.entity}")
logger.info(f"Reading {_slice} associations of {self.entity}")
associations = associations_stream.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for group in associations:
current_record = records_by_pk[group["from"]["id"]]
Expand Down

0 comments on commit 0fa38ef

Please sign in to comment.