Skip to content

Commit

Permalink
check_scopes added
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed May 10, 2022
1 parent 6247d65 commit a747c29
Showing 1 changed file with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple

import requests
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.deprecated.base_source import ConfiguredAirbyteStream
Expand Down Expand Up @@ -44,22 +45,60 @@
Workflows,
)

SCOPES = [
"automation",
"content",
"crm.lists.read",
"crm.objects.companies.read",
"crm.objects.contacts.read",
"crm.objects.deals.read",
"crm.objects.feedback_submissions.read",
"crm.objects.owners.read",
"crm.schemas.companies.read",
"crm.schemas.contacts.read",
"crm.schemas.deals.read",
"e-commerce",
"files",
"files.ui_hidden.read",
"forms",
"forms-uploaded-files",
"sales-email-read",
"tickets",
]


class SourceHubspot(AbstractSource):
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""Check connection"""
common_params = self.get_common_params(config=config)
if common_params.get("authenticator"):
access_token = common_params["authenticator"].get_access_token()
url = f"https://api.hubapi.com/oauth/v1/access-tokens/{access_token}"
try:
response = requests.get(url=url)
response.raise_for_status()
return self.check_scopes(response.json())
except Exception as e:
return False, repr(e)

alive = True
error_msg = None
common_params = self.get_common_params(config=config)
try:
contacts = Contacts(**common_params)
_ = contacts.properties
except HTTPError as error:
alive = False
error_msg = repr(error)

return alive, error_msg

@staticmethod
def check_scopes(response_json):
granted_scopes = response_json["scopes"]
missed_scopes = set(SCOPES) - set(granted_scopes)
if missed_scopes:
return False, "missed required scopes: " + ", ".join(sorted(missed_scopes))
return True, None

@staticmethod
def get_api(config: Mapping[str, Any]) -> API:
credentials = config.get("credentials", {})
Expand Down

0 comments on commit a747c29

Please sign in to comment.