Skip to content

Commit

Permalink
[low-code-connectors] Disable parse-time interpolation in favor of ru…
Browse files Browse the repository at this point in the history
…ntime-only (airbytehq#14923)

* abstract auth token

* basichttp

* remove prints

* docstrings

* get rid of parse-time interpolation

* always pass options through

* delete print

* delete misleading comment

* delete note

* reset

* pass down options

* delete duplicate file

* missing test

* refactor test

* rename to '$options'

* rename to ''

* interpolatedauth

* fix tests

* fix

* docstrings

* update docstring

* docstring

* update docstring

* remove extra field

* undo

* rename to runtime_parameters

* docstring

* update

* / -> *

* update template

* rename to options

* Add examples

* update docstring

* Update test

* newlines

* rename kwargs to options

* options init param

* delete duplicate line

* type hints

* update docstring

* Revert "delete duplicate line"

This reverts commit 4255d5b.

* delete duplicate code from bad merge

* rename file

* bump cdk version
  • Loading branch information
girarda authored and UsmanAli99 committed Aug 3, 2022
1 parent 27f39c7 commit bf251f1
Show file tree
Hide file tree
Showing 42 changed files with 259 additions and 284 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.68
- Replace parse-time string interpolation with run-time interpolation in YAML-based sources

## 0.1.67
- Add support declarative token authenticator.

Expand Down
22 changes: 12 additions & 10 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ def __init__(
config: Mapping[str, Any],
scopes: Optional[List[str]] = None,
token_expiry_date: Optional[Union[InterpolatedString, str]] = None,
access_token_name: Union[InterpolatedString, str] = InterpolatedString("access_token"),
expires_in_name: Union[InterpolatedString, str] = InterpolatedString("expires_in"),
access_token_name: Union[InterpolatedString, str] = "access_token",
expires_in_name: Union[InterpolatedString, str] = "expires_in",
refresh_request_body: Optional[Mapping[str, Any]] = None,
**options: Optional[Mapping[str, Any]],
):
"""
:param token_refresh_endpoint: The endpoint to refresh the access token
Expand All @@ -41,19 +42,20 @@ def __init__(
:param access_token_name: THe field to extract access token from in the response
:param expires_in_name:The field to extract expires_in from in the response
:param refresh_request_body: The request body to send in the refresh request
:param options: Additional runtime parameters to be used for string interpolation
"""
self.config = config
self.token_refresh_endpoint = InterpolatedString.create(token_refresh_endpoint)
self.client_secret = InterpolatedString.create(client_secret)
self.client_id = InterpolatedString.create(client_id)
self.refresh_token = InterpolatedString.create(refresh_token)
self.token_refresh_endpoint = InterpolatedString.create(token_refresh_endpoint, options=options)
self.client_secret = InterpolatedString.create(client_secret, options=options)
self.client_id = InterpolatedString.create(client_id, options=options)
self.refresh_token = InterpolatedString.create(refresh_token, options=options)
self.scopes = scopes
self.access_token_name = InterpolatedString.create(access_token_name)
self.expires_in_name = InterpolatedString.create(expires_in_name)
self.refresh_request_body = InterpolatedMapping(refresh_request_body or {})
self.access_token_name = InterpolatedString.create(access_token_name, options=options)
self.expires_in_name = InterpolatedString.create(expires_in_name, options=options)
self.refresh_request_body = InterpolatedMapping(refresh_request_body or {}, options=options)

self.token_expiry_date = (
pendulum.parse(InterpolatedString.create(token_expiry_date).eval(self.config))
pendulum.parse(InterpolatedString.create(token_expiry_date, options=options).eval(self.config))
if token_expiry_date
else pendulum.now().subtract(days=1)
)
Expand Down
35 changes: 25 additions & 10 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#

import base64
from typing import Union
from typing import Any, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http.requests_native_auth.abtract_token import AbstractHeaderAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator


class ApiKeyAuthenticator(AbstractHeaderAuthenticator):
Expand All @@ -24,14 +24,21 @@ class ApiKeyAuthenticator(AbstractHeaderAuthenticator):
"""

def __init__(self, header: Union[InterpolatedString, str], token: Union[InterpolatedString, str], config: Config):
def __init__(
self,
header: Union[InterpolatedString, str],
token: Union[InterpolatedString, str],
config: Config,
**options: Optional[Mapping[str, Any]],
):
"""
:param header: Header key to set on the HTTP requests
:param token: Header value to set on the HTTP requests
:param config: The user-provided configuration as specified by the source's spec
:param options: Additional runtime parameters to be used for string interpolation
"""
self._header = InterpolatedString.create(header)
self._token = InterpolatedString.create(token)
self._header = InterpolatedString.create(header, options=options)
self._token = InterpolatedString.create(token, options=options)
self._config = config

@property
Expand All @@ -51,12 +58,13 @@ class BearerAuthenticator(AbstractHeaderAuthenticator):
`"Authorization": "Bearer <token>"`
"""

def __init__(self, token: Union[InterpolatedString, str], config: Config):
def __init__(self, token: Union[InterpolatedString, str], config: Config, **options: Optional[Mapping[str, Any]]):
"""
:param token: The bearer token
:param config: The user-provided configuration as specified by the source's spec
:param options: Additional runtime parameters to be used for string interpolation
"""
self._token = InterpolatedString.create(token)
self._token = InterpolatedString.create(token, options=options)
self._config = config

@property
Expand All @@ -77,14 +85,21 @@ class BasicHttpAuthenticator(AbstractHeaderAuthenticator):
`"Authorization": "Basic <encoded_credentials>"`
"""

def __init__(self, username: Union[InterpolatedString, str], config: Config, password: Union[InterpolatedString, str] = ""):
def __init__(
self,
username: Union[InterpolatedString, str],
config: Config,
password: Union[InterpolatedString, str] = "",
**options: Optional[Mapping[str, Any]],
):
"""
:param username: The username
:param config: The user-provided configuration as specified by the source's spec
:param password: The password
:param options: Additional runtime parameters to be used for string interpolation
"""
self._username = InterpolatedString.create(username)
self._password = InterpolatedString.create(password)
self._username = InterpolatedString.create(username, options=options)
self._password = InterpolatedString.create(password, options=options)
self._config = config

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import logging
from typing import Any, List, Mapping, Tuple
from typing import Any, List, Mapping, Optional, Tuple

from airbyte_cdk.models.airbyte_protocol import SyncMode
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
Expand All @@ -15,11 +15,13 @@ class CheckStream(ConnectionChecker):
Checks the connections by trying to read records from one or many of the streams selected by the developer
"""

def __init__(self, stream_names: List[str]):
def __init__(self, stream_names: List[str], **options: Optional[Mapping[str, Any]]):
"""
:param stream_names: name of streams to read records from
:param options: Additional runtime parameters to be used for string interpolation
"""
self._stream_names = set(stream_names)
self._options = options

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
streams = source.streams(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import inspect

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
OPTIONS_STR = "$options"


def create(func, /, *args, **keywords):
Expand All @@ -15,6 +15,7 @@ def create(func, /, *args, **keywords):
The interpolation will take in kwargs, and config as parameters that can be accessed through interpolating.
If any of the parameters are also create functions, they will also be created.
kwargs are propagated to the recursive method calls
:param func: Function
:param args:
:param keywords:
Expand All @@ -28,21 +29,12 @@ def newfunc(*fargs, **fkeywords):
# config is a special keyword used for interpolation
config = all_keywords.pop("config", None)

# options is a special keyword used for interpolation and propagation
if "options" in all_keywords:
options = all_keywords.pop("options")
# $options is a special keyword used for interpolation and propagation
if OPTIONS_STR in all_keywords:
options = all_keywords.get(OPTIONS_STR)
else:
options = dict()

# create object's partial parameters
fully_created = _create_inner_objects(all_keywords, options)

# interpolate the parameters
interpolated_keywords = InterpolatedMapping(fully_created).eval(config, **{"options": options})
interpolated_keywords = {k: v for k, v in interpolated_keywords.items() if v}

all_keywords.update(interpolated_keywords)

# if config is not none, add it back to the keywords mapping
if config is not None:
all_keywords["config"] = config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import datetime as dt
from typing import Union
from typing import Any, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString

Expand All @@ -21,40 +21,44 @@ def __init__(
datetime_format: str = "",
min_datetime: Union[InterpolatedString, str] = "",
max_datetime: Union[InterpolatedString, str] = "",
**options: Optional[Mapping[str, Any]],
):
"""
:param datetime: InterpolatedString or string representing the datetime in the format specified by `datetime_format`
:param datetime_format: Format of the datetime passed as argument
:param min_datetime: InterpolatedString or string representing the min datetime
:param max_datetime: InterpolatedString or string representing the max datetime
:param options: Additional runtime parameters to be used for string interpolation
"""
self._datetime_interpolator = InterpolatedString.create(datetime)
self._datetime_interpolator = InterpolatedString.create(datetime, options=options)
self._datetime_format = datetime_format
self._timezone = dt.timezone.utc
self._min_datetime_interpolator = InterpolatedString.create(min_datetime) if min_datetime else None
self._max_datetime_interpolator = InterpolatedString.create(max_datetime) if max_datetime else None
self._min_datetime_interpolator = InterpolatedString.create(min_datetime, options=options) if min_datetime else None
self._max_datetime_interpolator = InterpolatedString.create(max_datetime, options=options) if max_datetime else None

def get_datetime(self, config, **kwargs) -> dt.datetime:
def get_datetime(self, config, **additional_options) -> dt.datetime:
"""
Evaluates and returns the datetime
:param config: The user-provided configuration as specified by the source's spec
:param kwargs: Additional arguments to be passed to the strings for interpolation
:param additional_options: Additional arguments to be passed to the strings for interpolation
:return: The evaluated datetime
"""
# We apply a default datetime format here instead of at instantiation, so it can be set by the parent first
datetime_format = self._datetime_format
if not datetime_format:
datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z"

time = dt.datetime.strptime(self._datetime_interpolator.eval(config, **kwargs), datetime_format).replace(tzinfo=self._timezone)
time = dt.datetime.strptime(self._datetime_interpolator.eval(config, **additional_options), datetime_format).replace(
tzinfo=self._timezone
)

if self._min_datetime_interpolator:
min_time = dt.datetime.strptime(self._min_datetime_interpolator.eval(config, **kwargs), datetime_format).replace(
min_time = dt.datetime.strptime(self._min_datetime_interpolator.eval(config, **additional_options), datetime_format).replace(
tzinfo=self._timezone
)
time = max(time, min_time)
if self._max_datetime_interpolator:
max_time = dt.datetime.strptime(self._max_datetime_interpolator.eval(config, **kwargs), datetime_format).replace(
max_time = dt.datetime.strptime(self._max_datetime_interpolator.eval(config, **additional_options), datetime_format).replace(
tzinfo=self._timezone
)
time = min(time, max_time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ class JelloExtractor:

default_transform = "_"

def __init__(self, transform: Union[InterpolatedString, str], config: Config, decoder: Decoder = JsonDecoder(), kwargs=None):
def __init__(self, transform: Union[InterpolatedString, str], config: Config, decoder: Decoder = JsonDecoder()):
"""
:param transform: The Jello query to evaluate on the decoded response
:param config: The user-provided configuration as specified by the source's spec
:param decoder: The decoder responsible to transfom the response in a Mapping
:param kwargs: Additional arguments to be passed to the strings for interpolation
"""

if isinstance(transform, str):
Expand All @@ -35,9 +34,8 @@ def __init__(self, transform: Union[InterpolatedString, str], config: Config, de
self._transform = transform
self._decoder = decoder
self._config = config
self._kwargs = kwargs or dict()

def extract_records(self, response: requests.Response) -> List[Record]:
response_body = self._decoder.decode(response)
script = self._transform.eval(self._config, **{"kwargs": self._kwargs})
script = self._transform.eval(self._config)
return jello_lib.pyquery(response_body, script)
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ class RecordFilter:
Filter applied on a list of Records
"""

def __init__(self, config: Config, condition: str = ""):
def __init__(self, config: Config, condition: str = "", **options: Optional[Mapping[str, Any]]):
"""
:param config: The user-provided configuration as specified by the source's spec
:param condition: The string representing the predicate to filter a record. Records will be removed if evaluated to False
:param options: Additional runtime parameters to be used for string interpolation
"""
self._config = config
self._filter_interpolator = InterpolatedBoolean(condition)
self._options = options

def filter_records(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ class RecordSelector(HttpSelector):
records based on a heuristic.
"""

def __init__(self, extractor: JelloExtractor, record_filter: RecordFilter = None):
def __init__(self, extractor: JelloExtractor, record_filter: RecordFilter = None, **options: Optional[Mapping[str, Any]]):
"""
:param extractor: The record extractor responsible for extracting records from a response
:param record_filter: The record filter responsible for filtering extracted records
:param options: Additional runtime parameters to be used for string interpolation
"""
self._extractor = extractor
self._record_filter = record_filter
self._options = options

def select_records(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Final, List
from typing import Any, Final, List, Mapping, Optional

from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.types import Config
Expand All @@ -16,26 +16,28 @@ class InterpolatedBoolean:
The string will be evaluated as False if it interpolates to a value in {FALSE_VALUES}
"""

def __init__(self, condition: str):
def __init__(self, condition: str, **options: Optional[Mapping[str, Any]]):
"""
:param condition: The string representing the condition to evaluate to a boolean
:param options: Additional runtime parameters to be used for string interpolation
"""
self._condition = condition
self._default = "False"
self._interpolation = JinjaInterpolation()
self._options = options

def eval(self, config: Config, **kwargs):
def eval(self, config: Config, **additional_options):
"""
Interpolates the predicate condition string using the config and other optional arguments passed as parameter.
:param config: The user-provided configuration as specified by the source's spec
:param kwargs: Optional parameters used for interpolation
:param additional_options: Optional parameters used for interpolation
:return: The interpolated string
"""
if isinstance(self._condition, bool):
return self._condition
else:
evaluated = self._interpolation.eval(self._condition, config, self._default, **kwargs)
evaluated = self._interpolation.eval(self._condition, config, self._default, options=self._options, **additional_options)
if evaluated in FALSE_VALUES:
return False
# The presence of a value is generally regarded as truthy, so we treat it as such
Expand Down
Loading

0 comments on commit bf251f1

Please sign in to comment.