Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low-Code: added SessionTokenAuthenticator #19716

Merged
merged 7 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.13.0
Low-code: Add SessionTokenAuthenticator

## 0.12.4
Lookback window should applied when a state is supplied as well

Expand Down
125 changes: 125 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
#

import base64
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union

import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
from cachetools import TTLCache, cached
from dataclasses_jsonschema import JsonSchemaMixin


Expand Down Expand Up @@ -115,3 +118,125 @@ def token(self) -> str:
auth_string = f"{self._username.eval(self.config)}:{self._password.eval(self.config)}".encode("utf8")
b64_encoded = base64.b64encode(auth_string).decode("utf8")
return f"Basic {b64_encoded}"


"""
maxsize - The maximum size of the cache
ttl - time-to-live value in seconds
docs https://cachetools.readthedocs.io/en/latest/
maxsize=1000 - when the cache is full, in this case more than 1000,
i.e. by adding another item the cache would exceed its maximum size, the cache must choose which item(s) to discard
ttl=86400 means that cached token will live for 86400 seconds (one day)
"""
cacheSessionTokenAuthenticator = TTLCache(maxsize=1000, ttl=86400)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment on how those values were chosen for posterity? Is it worth parameterizing them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done



@cached(cacheSessionTokenAuthenticator)
def get_new_session_token(api_url: str, username: str, password: str, response_key: str) -> str:
"""
This method retrieves session token from api by username and password for SessionTokenAuthenticator.
It's cashed to avoid a multiple calling by sync and updating session token every stream sync.
Args:
api_url: api url for getting new session token
username: username for auth
password: password for auth
response_key: field name in response to retrieve a session token

Returns:
session token
"""
response = requests.post(
f"{api_url}",
headers={"Content-Type": "application/json"},
json={"username": username, "password": password},
)
response.raise_for_status()
if not response.ok:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")
return response.json()[response_key]


@dataclass
class SessionTokenAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Builds auth based on session tokens.
A session token is a random value generated by a server to identify
a specific user for the duration of one interaction session.

The header is of the form
`"Specific Header": "Session Token Value"`

Attributes:
api_url (Union[InterpolatedString, str]): Base api url of source
username (Union[InterpolatedString, str]): The username
config (Config): The user-provided configuration as specified by the source's spec
password (Union[InterpolatedString, str]): The password
header (Union[InterpolatedString, str]): Specific header of source for providing session token
options (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
session_token (Union[InterpolatedString, str]): Session token generated by user
session_token_response_key (Union[InterpolatedString, str]): Key for retrieving session token from api response
login_url (Union[InterpolatedString, str]): Url fot getting a specific session token
validate_session_url (Union[InterpolatedString, str]): Url to validate passed session token
"""

api_url: Union[InterpolatedString, str]
header: Union[InterpolatedString, str]
session_token: Union[InterpolatedString, str]
session_token_response_key: Union[InterpolatedString, str]
username: Union[InterpolatedString, str]
config: Config
options: InitVar[Mapping[str, Any]]
login_url: Union[InterpolatedString, str]
validate_session_url: Union[InterpolatedString, str]
password: Union[InterpolatedString, str] = ""

def __post_init__(self, options):
self._username = InterpolatedString.create(self.username, options=options)
self._password = InterpolatedString.create(self.password, options=options)
self._api_url = InterpolatedString.create(self.api_url, options=options)
self._header = InterpolatedString.create(self.header, options=options)
self._session_token = InterpolatedString.create(self.session_token, options=options)
self._session_token_response_key = InterpolatedString.create(self.session_token_response_key, options=options)
self._login_url = InterpolatedString.create(self.login_url, options=options)
self._validate_session_url = InterpolatedString.create(self.validate_session_url, options=options)

self.logger = logging.getLogger("airbyte")

@property
def auth_header(self) -> str:
return self._header.eval(self.config)

@property
def token(self) -> str:
if self._session_token.eval(self.config):
if self.is_valid_session_token():
return self._session_token.eval(self.config)
if self._password.eval(self.config) and self._username.eval(self.config):
username = self._username.eval(self.config)
password = self._password.eval(self.config)
session_token_response_key = self._session_token_response_key.eval(self.config)
api_url = f"{self._api_url.eval(self.config)}{self._login_url.eval(self.config)}"

self.logger.info("Using generated session token by username and password")
return get_new_session_token(api_url, username, password, session_token_response_key)

raise ConnectionError("Invalid credentials: session token is not valid or provide username and password")

def is_valid_session_token(self) -> bool:
try:
response = requests.get(
f"{self._api_url.eval(self.config)}{self._validate_session_url.eval(self.config)}",
headers={self.auth_header: self._session_token.eval(self.config)},
)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == requests.codes["unauthorized"]:
self.logger.info(f"Unable to connect by session token from config due to {str(e)}")
return False
else:
raise ConnectionError(f"Error while validating session token: {e}")
if response.ok:
self.logger.info("Connection check for source is successful.")
return True
else:
raise ConnectionError(f"Failed to retrieve new session token, response code {response.status_code} because {response.reason}")
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@
},
{
"$ref": "#/definitions/BasicHttpAuthenticator"
},
{
"$ref": "#/definitions/SessionTokenAuthenticator"
}
]
},
Expand Down Expand Up @@ -629,6 +632,75 @@
],
"description": "\n Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64\n https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme\n\n The header is of the form\n `\"Authorization\": \"Basic <encoded_credentials>\"`\n\n Attributes:\n username (Union[InterpolatedString, str]): The username\n config (Config): The user-provided configuration as specified by the source's spec\n password (Union[InterpolatedString, str]): The password\n options (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation\n "
},
"SessionTokenAuthenticator": {
"allOf": [
{
"$ref": "#/definitions/DeclarativeAuthenticator"
},
{
"type": "object",
"required": ["api_url", "header"],
"properties": {
"api_url": {
"anyOf": [
{
"$ref": "#/definitions/InterpolatedString"
},
{
"type": "string"
}
]
},
"session_token": {
"anyOf": [
{
"$ref": "#/definitions/InterpolatedString"
},
{
"type": "string"
}
],
"default": ""
},
"username": {
"anyOf": [
{
"$ref": "#/definitions/InterpolatedString"
},
{
"type": "string"
}
],
"default": ""
},
"password": {
"anyOf": [
{
"$ref": "#/definitions/InterpolatedString"
},
{
"type": "string"
}
],
"default": ""
},
"header": {
"anyOf": [
{
"$ref": "#/definitions/InterpolatedString"
},
{
"type": "string"
}
]
},
"config": {
"type": "object"
}
}
}
]
},
"CompositeErrorHandler": {
"allOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@

from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth
from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator
from airbyte_cdk.sources.declarative.auth.token import ApiKeyAuthenticator, BasicHttpAuthenticator, BearerAuthenticator
from airbyte_cdk.sources.declarative.auth.token import (
ApiKeyAuthenticator,
BasicHttpAuthenticator,
BearerAuthenticator,
SessionTokenAuthenticator,
)
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
Expand Down Expand Up @@ -78,6 +83,7 @@
"SingleSlice": SingleSlice,
"Spec": Spec,
"SubstreamSlicer": SubstreamSlicer,
"SessionTokenAuthenticator": SessionTokenAuthenticator,
"WaitUntilTimeFromHeader": WaitUntilTimeFromHeaderBackoffStrategy,
"WaitTimeFromHeader": WaitTimeFromHeaderBackoffStrategy,
}
1 change: 1 addition & 0 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"requests_cache",
"Deprecated~=1.2",
"Jinja2~=3.1.2",
"cachetools",
],
python_requires=">=3.9",
extras_require={
Expand Down
Loading