From bf251f18fb99080833b9a8221e970ad172489de8 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 28 Jul 2022 08:57:17 -0700 Subject: [PATCH] [low-code-connectors] Disable parse-time interpolation in favor of runtime-only (#14923) * abstract auth token * basichttp * remove prints * docstrings * get rid of parse-time interpolation * always pass options through * delete print * delete misleading comment * delete note * reset * pass down options * delete duplicate file * missing test * refactor test * rename to '$options' * rename to '' * interpolatedauth * fix tests * fix * docstrings * update docstring * docstring * update docstring * remove extra field * undo * rename to runtime_parameters * docstring * update * / -> * * update template * rename to options * Add examples * update docstring * Update test * newlines * rename kwargs to options * options init param * delete duplicate line * type hints * update docstring * Revert "delete duplicate line" This reverts commit 4255d5b3469fbd426be103a215e9ba172abdfbef. * delete duplicate code from bad merge * rename file * bump cdk version --- airbyte-cdk/python/CHANGELOG.md | 3 ++ .../sources/declarative/auth/oauth.py | 22 ++++---- .../sources/declarative/auth/token.py | 35 ++++++++---- .../declarative/checks/check_stream.py | 6 ++- .../sources/declarative/create_partial.py | 18 ++----- .../declarative/datetime/min_max_datetime.py | 22 ++++---- .../sources/declarative/extractors/jello.py | 6 +-- .../declarative/extractors/record_filter.py | 4 +- .../declarative/extractors/record_selector.py | 4 +- .../interpolation/interpolated_boolean.py | 12 +++-- .../interpolation/interpolated_mapping.py | 21 ++++---- .../interpolation/interpolated_string.py | 12 +++-- .../interpolation/interpolation.py | 6 +-- .../declarative/interpolation/jinja.py | 4 +- .../sources/declarative/parsers/factory.py | 51 ++++++++++++++---- .../declarative/requesters/http_requester.py | 3 ++ .../interpolated_request_input_provider.py | 15 ++++-- .../requesters/paginators/limit_paginator.py | 18 ++++--- .../strategies/cursor_pagination_strategy.py | 4 +- .../retrievers/simple_retriever.py | 3 +- .../sources/declarative/schema/json_schema.py | 12 ++--- .../stream_slicers/datetime_stream_slicer.py | 8 +-- .../stream_slicers/list_stream_slicer.py | 7 +-- .../stream_slicers/substream_slicer.py | 7 ++- .../declarative/transformations/add_fields.py | 10 ++-- .../http/requests_native_auth/__init__.py | 7 +-- .../{abtract_token.py => abstract_token.py} | 9 ++-- .../http/requests_native_auth/token.py | 2 +- airbyte-cdk/python/setup.py | 2 +- .../checks/test_interpolated_mapping.py | 26 --------- .../checks/test_interpolated_string.py | 26 --------- .../declarative/extractors/test_jello.py | 4 +- .../extractors/test_record_selector.py | 2 +- .../test_interpolated_mapping.py | 24 ++++++--- .../interpolation/test_interpolated_string.py | 36 ++++++------- .../paginators/test_interpolated_string.py | 54 ------------------- .../requesters/test_http_requester.py | 4 +- .../test_cartesian_product_stream_slicer.py | 2 +- .../declarative/test_create_partial.py | 2 +- .../sources/declarative/test_factory.py | 26 ++++----- .../source-configuration-based/setup.py.hbs | 2 +- .../{{snakeCase name}}.yaml.hbs | 2 +- 42 files changed, 259 insertions(+), 284 deletions(-) rename airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/{abtract_token.py => abstract_token.py} (74%) delete mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_mapping.py delete mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_string.py delete mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_string.py diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 964339b72254..2df5255f8265 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.68 +- Replace parse-time string interpolation with run-time interpolation in YAML-based sources + ## 0.1.67 - Add support declarative token authenticator. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py index c06d40578c00..d20864f47eb1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/oauth.py @@ -26,9 +26,10 @@ def __init__( config: Mapping[str, Any], scopes: Optional[List[str]] = None, token_expiry_date: Optional[Union[InterpolatedString, str]] = None, - access_token_name: Union[InterpolatedString, str] = InterpolatedString("access_token"), - expires_in_name: Union[InterpolatedString, str] = InterpolatedString("expires_in"), + access_token_name: Union[InterpolatedString, str] = "access_token", + expires_in_name: Union[InterpolatedString, str] = "expires_in", refresh_request_body: Optional[Mapping[str, Any]] = None, + **options: Optional[Mapping[str, Any]], ): """ :param token_refresh_endpoint: The endpoint to refresh the access token @@ -41,19 +42,20 @@ def __init__( :param access_token_name: THe field to extract access token from in the response :param expires_in_name:The field to extract expires_in from in the response :param refresh_request_body: The request body to send in the refresh request + :param options: Additional runtime parameters to be used for string interpolation """ self.config = config - self.token_refresh_endpoint = InterpolatedString.create(token_refresh_endpoint) - self.client_secret = InterpolatedString.create(client_secret) - self.client_id = InterpolatedString.create(client_id) - self.refresh_token = InterpolatedString.create(refresh_token) + self.token_refresh_endpoint = InterpolatedString.create(token_refresh_endpoint, options=options) + self.client_secret = InterpolatedString.create(client_secret, options=options) + self.client_id = InterpolatedString.create(client_id, options=options) + self.refresh_token = InterpolatedString.create(refresh_token, options=options) self.scopes = scopes - self.access_token_name = InterpolatedString.create(access_token_name) - self.expires_in_name = InterpolatedString.create(expires_in_name) - self.refresh_request_body = InterpolatedMapping(refresh_request_body or {}) + self.access_token_name = InterpolatedString.create(access_token_name, options=options) + self.expires_in_name = InterpolatedString.create(expires_in_name, options=options) + self.refresh_request_body = InterpolatedMapping(refresh_request_body or {}, options=options) self.token_expiry_date = ( - pendulum.parse(InterpolatedString.create(token_expiry_date).eval(self.config)) + pendulum.parse(InterpolatedString.create(token_expiry_date, options=options).eval(self.config)) if token_expiry_date else pendulum.now().subtract(days=1) ) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py index 2b28f7f85941..30520b03d971 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py @@ -3,11 +3,11 @@ # import base64 -from typing import Union +from typing import Any, Mapping, Optional, Union from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.types import Config -from airbyte_cdk.sources.streams.http.requests_native_auth.abtract_token import AbstractHeaderAuthenticator +from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator class ApiKeyAuthenticator(AbstractHeaderAuthenticator): @@ -24,14 +24,21 @@ class ApiKeyAuthenticator(AbstractHeaderAuthenticator): """ - def __init__(self, header: Union[InterpolatedString, str], token: Union[InterpolatedString, str], config: Config): + def __init__( + self, + header: Union[InterpolatedString, str], + token: Union[InterpolatedString, str], + config: Config, + **options: Optional[Mapping[str, Any]], + ): """ :param header: Header key to set on the HTTP requests :param token: Header value to set on the HTTP requests :param config: The user-provided configuration as specified by the source's spec + :param options: Additional runtime parameters to be used for string interpolation """ - self._header = InterpolatedString.create(header) - self._token = InterpolatedString.create(token) + self._header = InterpolatedString.create(header, options=options) + self._token = InterpolatedString.create(token, options=options) self._config = config @property @@ -51,12 +58,13 @@ class BearerAuthenticator(AbstractHeaderAuthenticator): `"Authorization": "Bearer "` """ - def __init__(self, token: Union[InterpolatedString, str], config: Config): + def __init__(self, token: Union[InterpolatedString, str], config: Config, **options: Optional[Mapping[str, Any]]): """ :param token: The bearer token :param config: The user-provided configuration as specified by the source's spec + :param options: Additional runtime parameters to be used for string interpolation """ - self._token = InterpolatedString.create(token) + self._token = InterpolatedString.create(token, options=options) self._config = config @property @@ -77,14 +85,21 @@ class BasicHttpAuthenticator(AbstractHeaderAuthenticator): `"Authorization": "Basic "` """ - def __init__(self, username: Union[InterpolatedString, str], config: Config, password: Union[InterpolatedString, str] = ""): + def __init__( + self, + username: Union[InterpolatedString, str], + config: Config, + password: Union[InterpolatedString, str] = "", + **options: Optional[Mapping[str, Any]], + ): """ :param username: The username :param config: The user-provided configuration as specified by the source's spec :param password: The password + :param options: Additional runtime parameters to be used for string interpolation """ - self._username = InterpolatedString.create(username) - self._password = InterpolatedString.create(password) + self._username = InterpolatedString.create(username, options=options) + self._password = InterpolatedString.create(password, options=options) self._config = config @property diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index f1b9e49e33cf..47db5130ad96 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -3,7 +3,7 @@ # import logging -from typing import Any, List, Mapping, Tuple +from typing import Any, List, Mapping, Optional, Tuple from airbyte_cdk.models.airbyte_protocol import SyncMode from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker @@ -15,11 +15,13 @@ class CheckStream(ConnectionChecker): Checks the connections by trying to read records from one or many of the streams selected by the developer """ - def __init__(self, stream_names: List[str]): + def __init__(self, stream_names: List[str], **options: Optional[Mapping[str, Any]]): """ :param stream_names: name of streams to read records from + :param options: Additional runtime parameters to be used for string interpolation """ self._stream_names = set(stream_names) + self._options = options def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]: streams = source.streams(config) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py index b668ed73d642..c4b9f4ac5619 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py @@ -4,7 +4,7 @@ import inspect -from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping +OPTIONS_STR = "$options" def create(func, /, *args, **keywords): @@ -15,6 +15,7 @@ def create(func, /, *args, **keywords): The interpolation will take in kwargs, and config as parameters that can be accessed through interpolating. If any of the parameters are also create functions, they will also be created. kwargs are propagated to the recursive method calls + :param func: Function :param args: :param keywords: @@ -28,21 +29,12 @@ def newfunc(*fargs, **fkeywords): # config is a special keyword used for interpolation config = all_keywords.pop("config", None) - # options is a special keyword used for interpolation and propagation - if "options" in all_keywords: - options = all_keywords.pop("options") + # $options is a special keyword used for interpolation and propagation + if OPTIONS_STR in all_keywords: + options = all_keywords.get(OPTIONS_STR) else: options = dict() - # create object's partial parameters - fully_created = _create_inner_objects(all_keywords, options) - - # interpolate the parameters - interpolated_keywords = InterpolatedMapping(fully_created).eval(config, **{"options": options}) - interpolated_keywords = {k: v for k, v in interpolated_keywords.items() if v} - - all_keywords.update(interpolated_keywords) - # if config is not none, add it back to the keywords mapping if config is not None: all_keywords["config"] = config diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py index 88b84f1ab54f..eb105803c1fd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py @@ -3,7 +3,7 @@ # import datetime as dt -from typing import Union +from typing import Any, Mapping, Optional, Union from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString @@ -21,24 +21,26 @@ def __init__( datetime_format: str = "", min_datetime: Union[InterpolatedString, str] = "", max_datetime: Union[InterpolatedString, str] = "", + **options: Optional[Mapping[str, Any]], ): """ :param datetime: InterpolatedString or string representing the datetime in the format specified by `datetime_format` :param datetime_format: Format of the datetime passed as argument :param min_datetime: InterpolatedString or string representing the min datetime :param max_datetime: InterpolatedString or string representing the max datetime + :param options: Additional runtime parameters to be used for string interpolation """ - self._datetime_interpolator = InterpolatedString.create(datetime) + self._datetime_interpolator = InterpolatedString.create(datetime, options=options) self._datetime_format = datetime_format self._timezone = dt.timezone.utc - self._min_datetime_interpolator = InterpolatedString.create(min_datetime) if min_datetime else None - self._max_datetime_interpolator = InterpolatedString.create(max_datetime) if max_datetime else None + self._min_datetime_interpolator = InterpolatedString.create(min_datetime, options=options) if min_datetime else None + self._max_datetime_interpolator = InterpolatedString.create(max_datetime, options=options) if max_datetime else None - def get_datetime(self, config, **kwargs) -> dt.datetime: + def get_datetime(self, config, **additional_options) -> dt.datetime: """ Evaluates and returns the datetime :param config: The user-provided configuration as specified by the source's spec - :param kwargs: Additional arguments to be passed to the strings for interpolation + :param additional_options: Additional arguments to be passed to the strings for interpolation :return: The evaluated datetime """ # We apply a default datetime format here instead of at instantiation, so it can be set by the parent first @@ -46,15 +48,17 @@ def get_datetime(self, config, **kwargs) -> dt.datetime: if not datetime_format: datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z" - time = dt.datetime.strptime(self._datetime_interpolator.eval(config, **kwargs), datetime_format).replace(tzinfo=self._timezone) + time = dt.datetime.strptime(self._datetime_interpolator.eval(config, **additional_options), datetime_format).replace( + tzinfo=self._timezone + ) if self._min_datetime_interpolator: - min_time = dt.datetime.strptime(self._min_datetime_interpolator.eval(config, **kwargs), datetime_format).replace( + min_time = dt.datetime.strptime(self._min_datetime_interpolator.eval(config, **additional_options), datetime_format).replace( tzinfo=self._timezone ) time = max(time, min_time) if self._max_datetime_interpolator: - max_time = dt.datetime.strptime(self._max_datetime_interpolator.eval(config, **kwargs), datetime_format).replace( + max_time = dt.datetime.strptime(self._max_datetime_interpolator.eval(config, **additional_options), datetime_format).replace( tzinfo=self._timezone ) time = min(time, max_time) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py index 798326db8b5a..250d712d26f6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/jello.py @@ -21,12 +21,11 @@ class JelloExtractor: default_transform = "_" - def __init__(self, transform: Union[InterpolatedString, str], config: Config, decoder: Decoder = JsonDecoder(), kwargs=None): + def __init__(self, transform: Union[InterpolatedString, str], config: Config, decoder: Decoder = JsonDecoder()): """ :param transform: The Jello query to evaluate on the decoded response :param config: The user-provided configuration as specified by the source's spec :param decoder: The decoder responsible to transfom the response in a Mapping - :param kwargs: Additional arguments to be passed to the strings for interpolation """ if isinstance(transform, str): @@ -35,9 +34,8 @@ def __init__(self, transform: Union[InterpolatedString, str], config: Config, de self._transform = transform self._decoder = decoder self._config = config - self._kwargs = kwargs or dict() def extract_records(self, response: requests.Response) -> List[Record]: response_body = self._decoder.decode(response) - script = self._transform.eval(self._config, **{"kwargs": self._kwargs}) + script = self._transform.eval(self._config) return jello_lib.pyquery(response_body, script) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py index 1ba460fe4520..8f0b123ff895 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -13,13 +13,15 @@ class RecordFilter: Filter applied on a list of Records """ - def __init__(self, config: Config, condition: str = ""): + def __init__(self, config: Config, condition: str = "", **options: Optional[Mapping[str, Any]]): """ :param config: The user-provided configuration as specified by the source's spec :param condition: The string representing the predicate to filter a record. Records will be removed if evaluated to False + :param options: Additional runtime parameters to be used for string interpolation """ self._config = config self._filter_interpolator = InterpolatedBoolean(condition) + self._options = options def filter_records( self, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py index 59e280a85632..193f0e7576eb 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -17,13 +17,15 @@ class RecordSelector(HttpSelector): records based on a heuristic. """ - def __init__(self, extractor: JelloExtractor, record_filter: RecordFilter = None): + def __init__(self, extractor: JelloExtractor, record_filter: RecordFilter = None, **options: Optional[Mapping[str, Any]]): """ :param extractor: The record extractor responsible for extracting records from a response :param record_filter: The record filter responsible for filtering extracted records + :param options: Additional runtime parameters to be used for string interpolation """ self._extractor = extractor self._record_filter = record_filter + self._options = options def select_records( self, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py index def3fb95d290..8eff06e3bd8a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, Final, List +from typing import Any, Final, List, Mapping, Optional from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.sources.declarative.types import Config @@ -16,26 +16,28 @@ class InterpolatedBoolean: The string will be evaluated as False if it interpolates to a value in {FALSE_VALUES} """ - def __init__(self, condition: str): + def __init__(self, condition: str, **options: Optional[Mapping[str, Any]]): """ :param condition: The string representing the condition to evaluate to a boolean + :param options: Additional runtime parameters to be used for string interpolation """ self._condition = condition self._default = "False" self._interpolation = JinjaInterpolation() + self._options = options - def eval(self, config: Config, **kwargs): + def eval(self, config: Config, **additional_options): """ Interpolates the predicate condition string using the config and other optional arguments passed as parameter. :param config: The user-provided configuration as specified by the source's spec - :param kwargs: Optional parameters used for interpolation + :param additional_options: Optional parameters used for interpolation :return: The interpolated string """ if isinstance(self._condition, bool): return self._condition else: - evaluated = self._interpolation.eval(self._condition, config, self._default, **kwargs) + evaluated = self._interpolation.eval(self._condition, config, self._default, options=self._options, **additional_options) if evaluated in FALSE_VALUES: return False # The presence of a value is generally regarded as truthy, so we treat it as such diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py index bbfa216cb493..fc46cf4f8552 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py @@ -2,34 +2,37 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Mapping +from typing import Any, Mapping from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.sources.declarative.types import Config class InterpolatedMapping: - """ - Wrapper around a Mapping[str, str] where both the keys and values are to be interpolated. - """ + """Wrapper around a Mapping[str, str] where both the keys and values are to be interpolated.""" - def __init__(self, mapping: Mapping[str, str]): + def __init__(self, mapping: Mapping[str, Any], options: Mapping[str, Any]): """ :param mapping: Mapping[str, str] to be evaluated + :param options: Additional runtime parameters to be used for string interpolation """ self._mapping = mapping + self._options = options self._interpolation = JinjaInterpolation() - def eval(self, config: Config, **kwargs): + def eval(self, config: Config, **additional_options): """ Wrapper around a Mapping[str, str] that allows for both keys and values to be interpolated. :param config: The user-provided configuration as specified by the source's spec - :param kwargs: Optional parameters used for interpolation + :param additional_options: Optional parameters used for interpolation :return: The interpolated string """ interpolated_values = { - self._interpolation.eval(name, config, **kwargs): self._eval(value, config, **kwargs) for name, value in self._mapping.items() + self._interpolation.eval(name, config, options=self._options, **additional_options): self._eval( + value, config, **additional_options + ) + for name, value in self._mapping.items() } return interpolated_values @@ -37,6 +40,6 @@ def _eval(self, value, config, **kwargs): # The values in self._mapping can be of Any type # We only want to interpolate them if they are strings if type(value) == str: - return self._interpolation.eval(value, config, **kwargs) + return self._interpolation.eval(value, config, options=self._options, **kwargs) else: return value diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py index e9494effd8cf..3c2171a7d9d2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Optional, Union +from typing import Any, Mapping, Optional, Union from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.sources.declarative.types import Config @@ -13,14 +13,16 @@ class InterpolatedString: Wrapper around a raw string to be interpolated with the Jinja2 templating engine """ - def __init__(self, string: str, default: Optional[str] = None): + def __init__(self, string: str, *, options: Mapping[str, Any] = {}, default: Optional[str] = None): """ :param string: The string to evalute :param default: The default value to return if the evaluation returns an empty string + :param options: Additional runtime parameters to be used for string interpolation """ self._string = string self._default = default or string self._interpolation = JinjaInterpolation() + self._options = options or {} def eval(self, config: Config, **kwargs): """ @@ -30,7 +32,7 @@ def eval(self, config: Config, **kwargs): :param kwargs: Optional parameters used for interpolation :return: The interpolated string """ - return self._interpolation.eval(self._string, config, self._default, **kwargs) + return self._interpolation.eval(self._string, config, self._default, options=self._options, **kwargs) def __eq__(self, other): if not isinstance(other, InterpolatedString): @@ -41,6 +43,8 @@ def __eq__(self, other): def create( cls, string_or_interpolated: Union["InterpolatedString", str], + *, + options: Mapping[str, Any], ): """ Helper function to obtain an InterpolatedString from either a raw string or an InterpolatedString. @@ -50,6 +54,6 @@ def create( :return: InterpolatedString representing the input string. """ if isinstance(string_or_interpolated, str): - return InterpolatedString(string_or_interpolated) + return InterpolatedString(string_or_interpolated, options=options) else: return string_or_interpolated diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolation.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolation.py index 01f28804ed97..6fb8fabb2588 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolation.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/interpolation.py @@ -14,14 +14,14 @@ class Interpolation(ABC): """ @abstractmethod - def eval(self, input_str: str, config: Config, default: Optional[str] = None, **kwargs): + def eval(self, input_str: str, config: Config, default: Optional[str] = None, **additional_options): """ - Interpolates the input string using the config, and kwargs passed as paramter. + Interpolates the input string using the config, and additional options passed as parameter. :param input_str: The string to interpolate :param config: The user-provided configuration as specified by the source's spec :param default: Default value to return if the evaluation returns an empty string - :param kwargs: Optional parameters used for interpolation + :param additional_options: Optional parameters used for interpolation :return: The interpolated string """ pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py index f0ea55a699cf..883118fb5fdf 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -34,8 +34,8 @@ def __init__(self): self._environment = Environment() self._environment.globals.update(**macros) - def eval(self, input_str: str, config: Config, default: Optional[str] = None, **kwargs): - context = {"config": config, **kwargs} + def eval(self, input_str: str, config: Config, default: Optional[str] = None, **additional_options): + context = {"config": config, **additional_options} try: if isinstance(input_str, str): result = self._eval(input_str, context) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py index 2b0430d5a7ab..6303b05ca1f8 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py @@ -5,10 +5,11 @@ from __future__ import annotations import copy +import enum import importlib from typing import Any, List, Literal, Mapping, Type, Union, get_args, get_origin, get_type_hints -from airbyte_cdk.sources.declarative.create_partial import create +from airbyte_cdk.sources.declarative.create_partial import OPTIONS_STR, create from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.sources.declarative.parsers.class_types_registry import CLASS_TYPES_REGISTRY from airbyte_cdk.sources.declarative.parsers.default_implementation_registry import DEFAULT_IMPLEMENTATIONS_REGISTRY @@ -48,7 +49,7 @@ class DeclarativeComponentFactory: the factory will lookup the `CLASS_TYPES_REGISTRY` and replace the "type" field by "class_name" -> CLASS_TYPES_REGISTRY[type] and instantiate the object from the resulting mapping - If the component definition is a mapping with neighter a "class_name" nor a "type" field, + If the component definition is a mapping with neither a "class_name" nor a "type" field, the factory will do a best-effort attempt at inferring the component type by looking up the parent object's constructor type hints. If the type hint is an interface present in `DEFAULT_IMPLEMENTATIONS_REGISTRY`, then the factory will create an object of it's default implementation. @@ -71,6 +72,28 @@ class DeclarativeComponentFactory: ``` TopLevel(param=ParamType(k="v")) ``` + + Parameters can be passed down from a parent component to its subcomponents using the $options key. + This can be used to avoid repetitions. + ``` + outer: + $options: + MyKey: MyValue + inner: + k2: v2 + ``` + This the example above, if both outer and inner are types with a "MyKey" field, both of them will evaluate to "MyValue". + + The value can also be used for string interpolation: + ``` + outer: + $options: + MyKey: MyValue + inner: + k2: "MyKey is {{ options.MyKey }}" + ``` + In this example, outer.inner.k2 will evaluate to "MyValue" + """ def __init__(self): @@ -101,8 +124,8 @@ def build(self, class_or_class_name: Union[str, Type], config, **kwargs): class_ = class_or_class_name # create components in options before propagating them - if "options" in kwargs: - kwargs["options"] = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs["options"].items()} + if OPTIONS_STR in kwargs: + kwargs[OPTIONS_STR] = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs[OPTIONS_STR].items()} updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs.items()} return create(class_, config=config, **updated_kwargs) @@ -129,11 +152,11 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class): """ if self.is_object_definition_with_class_name(definition): # propagate kwargs to inner objects - definition["options"] = self._merge_dicts(kwargs.get("options", dict()), definition.get("options", dict())) + definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict())) return self.create_component(definition, config)() elif self.is_object_definition_with_type(definition): # If type is set instead of class_name, get the class_name from the CLASS_TYPES_REGISTRY - definition["options"] = self._merge_dicts(kwargs.get("options", dict()), definition.get("options", dict())) + definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict())) object_type = definition.pop("type") class_name = CLASS_TYPES_REGISTRY[object_type] definition["class_name"] = class_name @@ -145,14 +168,14 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class): # We don't have to instantiate builtin types (eg string and dict) because definition is already going to be of that type if expected_type and not self._is_builtin_type(expected_type): definition["class_name"] = expected_type - definition["options"] = self._merge_dicts(kwargs.get("options", dict()), definition.get("options", dict())) + definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict())) return self.create_component(definition, config)() else: return definition elif isinstance(definition, list): return [ self._create_subcomponent( - key, sub, self._merge_dicts(kwargs.get("options", dict()), self._get_subcomponent_options(sub)), config, parent_class + key, sub, self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)), config, parent_class ) for sub in definition ] @@ -161,7 +184,15 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class): if expected_type and not isinstance(definition, expected_type): # call __init__(definition) if definition is not a dict and is not of the expected type # for instance, to turn a string into an InterpolatedString - return expected_type(definition) + options = kwargs.get(OPTIONS_STR, {}) + try: + # enums can't accept options + if issubclass(expected_type, enum.Enum): + return expected_type(definition) + else: + return expected_type(definition, options=options) + except Exception as e: + raise Exception(f"failed to instantiate type {expected_type}. {e}") else: return definition @@ -198,7 +229,7 @@ def get_default_type(parameter_name, parent_class): @staticmethod def _get_subcomponent_options(sub: Any): if isinstance(sub, dict): - return sub.get("options", {}) + return sub.get(OPTIONS_STR, {}) else: return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py index 7ee9ad95016d..287428c97cb9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -35,6 +35,7 @@ def __init__( authenticator: HttpAuthenticator = None, error_handler: Optional[ErrorHandler] = None, config: Config, + **options: Optional[Mapping[str, Any]], ): """ :param name: Name of the stream. Only used for request/response caching @@ -45,6 +46,7 @@ def __init__( :param authenticator: Authenticator defining how to authenticate to the source :param error_handler: Error handler defining how to detect and handle errors :param config: The user-provided configuration as specified by the source's spec + :param options: Additional runtime parameters to be used for string interpolation """ if request_options_provider is None: request_options_provider = InterpolatedRequestOptionsProvider(config=config) @@ -60,6 +62,7 @@ def __init__( self._request_options_provider = request_options_provider self._error_handler = error_handler or DefaultErrorHandler() self._config = config + self._options = options def get_authenticator(self): return self._authenticator diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py index bba60150e01b..e3bf9e0f0e19 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py @@ -14,21 +14,30 @@ class InterpolatedRequestInputProvider: Helper class that generically performs string interpolation on the provided dictionary or string input """ - def __init__(self, *, config: Config, request_inputs: Optional[Union[str, Mapping[str, str]]] = None): + def __init__( + self, *, config: Config, request_inputs: Optional[Union[str, Mapping[str, str]]] = None, **options: Optional[Mapping[str, Any]] + ): + """ + :param config: The user-provided configuration as specified by the source's spec + :param request_inputs: The dictionary to interpolate + :param options: Additional runtime parameters to be used for string interpolation + """ + self._config = config if request_inputs is None: request_inputs = {} if isinstance(request_inputs, str): - self._interpolator = InterpolatedString(request_inputs, "") + self._interpolator = InterpolatedString(request_inputs, default="", options=options) else: - self._interpolator = InterpolatedMapping(request_inputs) + self._interpolator = InterpolatedMapping(request_inputs, options=options) def request_inputs( self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None ) -> Mapping[str, Any]: """ Returns the request inputs to set on an outgoing HTTP request + :param stream_state: The stream state :param stream_slice: The stream slice :param next_page_token: The pagination token diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py index 6a41b4cf1b86..c7ba91fec3ad 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py @@ -78,15 +78,17 @@ def __init__( config: Config, url_base: str, decoder: Decoder = None, + **options: Optional[Mapping[str, Any]], ): """ - :param page_size: the number of records to request - :param limit_option: the request option to set the limit. Cannot be injected in the path. - :param page_token_option: the request option to set the page token - :param pagination_strategy: Strategy defining how to get the next page token - :param config: connection config - :param url_base: endpoint's base url - :param decoder: decoder to decode the response + :param page_size: The number of records to request + :param limit_option: The request option to set the limit. Cannot be injected in the path. + :param page_token_option: The request option to set the page token + :param pagination_strategy: The strategy defining how to get the next page token + :param config: The user-provided configuration as specified by the source's spec + :param url_base: The endpoint's base url + :param decoder: The decoder to decode the response + :param options: Additional runtime parameters to be used for string interpolation """ if limit_option.inject_into == RequestOptionType.path: raise ValueError("Limit parameter cannot be a path") @@ -97,7 +99,7 @@ def __init__( self._pagination_strategy = pagination_strategy self._token = None if isinstance(url_base, str): - url_base = InterpolatedString(url_base) + url_base = InterpolatedString.create(url_base, options=options) self._url_base = url_base self._decoder = decoder or JsonDecoder() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py index 7f2650effc88..f583a28d6011 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -24,15 +24,17 @@ def __init__( config: Config, stop_condition: Optional[InterpolatedBoolean] = None, decoder: Optional[Decoder] = None, + **options: Optional[Mapping[str, Any]], ): """ :param cursor_value: template string evaluating to the cursor value :param config: connection config :param stop_condition: template string evaluating when to stop paginating :param decoder: decoder to decode the response + :param options: Additional runtime parameters to be used for string interpolation """ if isinstance(cursor_value, str): - cursor_value = InterpolatedString(cursor_value) + cursor_value = InterpolatedString.create(cursor_value, options=options) self._cursor_value = cursor_value self._config = config self._decoder = decoder or JsonDecoder() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 16b742a96e98..58a4bf13cfc9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -40,6 +40,7 @@ def __init__( record_selector: HttpSelector, paginator: Optional[Paginator] = None, stream_slicer: Optional[StreamSlicer] = SingleSlice(), + **options: Optional[Mapping[str, Any]], ): """ :param name: The stream's name @@ -48,7 +49,7 @@ def __init__( :param record_selector: The record selector :param paginator: The paginator :param stream_slicer: The stream slicer - :param state: The stream state + :param options: Additional runtime parameters to be used for string interpolation """ self._name = name self._primary_key = primary_key diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py index 73be45040351..8e29bd329c71 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py @@ -3,7 +3,7 @@ # import json -from typing import Any, Mapping +from typing import Any, Mapping, Optional from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader @@ -13,17 +13,15 @@ class JsonSchema(SchemaLoader): """Loads the schema from a json file""" - def __init__(self, file_path: InterpolatedString, name: str, config: Config, **kwargs): + def __init__(self, file_path: InterpolatedString, config: Config, **options: Optional[Mapping[str, Any]]): """ :param file_path: The path to the json file describing the schema - :param name: The stream's name :param config: The user-provided configuration as specified by the source's spec - :param kwargs: Additional arguments to pass to the string interpolation if needed + :param options: Additional arguments to pass to the string interpolation if needed """ self._file_path = file_path self._config = config - self._kwargs = kwargs - self._name = name + self._options = options def get_json_schema(self) -> Mapping[str, Any]: json_schema_path = self._get_json_filepath() @@ -31,4 +29,4 @@ def get_json_schema(self) -> Mapping[str, Any]: return json.loads(f.read()) def _get_json_filepath(self): - return self._file_path.eval(self._config, name=self._name, **self._kwargs) + return self._file_path.eval(self._config, **self._options) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index c7b227301788..a9190cc1fe59 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -48,6 +48,7 @@ def __init__( stream_state_field_start: Optional[str] = None, stream_state_field_end: Optional[str] = None, lookback_window: Optional[InterpolatedString] = None, + **options: Optional[Mapping[str, Any]], ): """ :param start_datetime: @@ -61,6 +62,7 @@ def __init__( :param stream_state_field_start: stream slice start time field :param stream_state_field_end: stream slice end time field :param lookback_window: how many days before start_datetime to read data for + :param options: Additional runtime parameters to be used for string interpolation """ self._timezone = datetime.timezone.utc self._interpolation = JinjaInterpolation() @@ -70,11 +72,11 @@ def __init__( self._end_datetime = end_datetime self._step = self._parse_timedelta(step) self._config = config - self._cursor_field = InterpolatedString.create(cursor_field) + self._cursor_field = InterpolatedString.create(cursor_field, options=options) self._start_time_option = start_time_option self._end_time_option = end_time_option - self._stream_slice_field_start = InterpolatedString.create(stream_state_field_start or "start_date") - self._stream_slice_field_end = InterpolatedString.create(stream_state_field_end or "end_date") + self._stream_slice_field_start = InterpolatedString.create(stream_state_field_start or "start_date", options=options) + self._stream_slice_field_end = InterpolatedString.create(stream_state_field_end or "end_date", options=options) self._cursor = None # tracks current datetime self._cursor_end = None # tracks end of current stream slice self._lookback_window = lookback_window diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py index 1466ba1650e1..719612bfa94b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py @@ -2,7 +2,6 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import ast from typing import Any, Iterable, List, Mapping, Optional, Union from airbyte_cdk.models import SyncMode @@ -24,17 +23,19 @@ def __init__( cursor_field: Union[InterpolatedString, str], config: Config, request_option: Optional[RequestOption] = None, + **options: Optional[Mapping[str, Any]], ): """ :param slice_values: The values to iterate over :param cursor_field: The name of the cursor field :param config: The user-provided configuration as specified by the source's spec :param request_option: The request option to configure the HTTP request + :param options: Additional runtime parameters to be used for string interpolation """ if isinstance(slice_values, str): - slice_values = ast.literal_eval(slice_values) + slice_values = InterpolatedString.create(slice_values, options=options).eval(config) if isinstance(cursor_field, str): - cursor_field = InterpolatedString(cursor_field) + cursor_field = InterpolatedString(cursor_field, options=options) self._cursor_field = cursor_field self._slice_values = slice_values self._config = config diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py index e8eac15970ec..17a65462f8f1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py @@ -35,17 +35,16 @@ class SubstreamSlicer(StreamSlicer): Will populate the state with `parent_stream_slice` and `parent_record` so they can be accessed by other components """ - def __init__( - self, - parent_streams_configs: List[ParentStreamConfig], - ): + def __init__(self, parent_streams_configs: List[ParentStreamConfig], **options: Optional[Mapping[str, Any]]): """ :param parent_streams_configs: parent streams to iterate over and their config + :param options: Additional runtime parameters to be used for string interpolation """ if not parent_streams_configs: raise ValueError("SubstreamSlicer needs at least 1 parent stream") self._parent_stream_configs = parent_streams_configs self._cursor = None + self._options = options def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): cursor = {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py index b39bf57f674b..d28e0941fc26 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -3,7 +3,7 @@ # from dataclasses import dataclass -from typing import List, Optional, Union +from typing import Any, List, Mapping, Optional, Union import dpath.util from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString @@ -75,7 +75,11 @@ class AddFields(RecordTransformation): value: {{ 2 * 2 }} """ - def __init__(self, fields: List[AddedFieldDefinition]): + def __init__(self, fields: List[AddedFieldDefinition], **options: Optional[Mapping[str, Any]]): + """ + :param fields: Fields to add + :param options: Additional runtime parameters to be used for string interpolation + """ self._fields: List[ParsedAddFieldDefinition] = [] for field in fields: if len(field.path) < 1: @@ -85,7 +89,7 @@ def __init__(self, fields: List[AddedFieldDefinition]): if not isinstance(field.value, str): raise f"Expected a string value for the AddFields transformation: {field}" else: - self._fields.append(ParsedAddFieldDefinition(field.path, InterpolatedString(field.value))) + self._fields.append(ParsedAddFieldDefinition(field.path, InterpolatedString.create(field.value, options=options))) else: self._fields.append(ParsedAddFieldDefinition(field.path, field.value)) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py index eeefe39bceec..c4f64a971ea0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py @@ -5,9 +5,4 @@ from .oauth import Oauth2Authenticator from .token import BasicHttpAuthenticator, MultipleTokenAuthenticator, TokenAuthenticator -__all__ = [ - "BasicHttpAuthenticator", - "Oauth2Authenticator", - "TokenAuthenticator", - "MultipleTokenAuthenticator", -] +__all__ = ["Oauth2Authenticator", "TokenAuthenticator", "MultipleTokenAuthenticator", "BasicHttpAuthenticator"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abtract_token.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_token.py similarity index 74% rename from airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abtract_token.py rename to airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_token.py index 57a7d5f82d9e..d416499bcafe 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abtract_token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_token.py @@ -9,9 +9,7 @@ class AbstractHeaderAuthenticator(AuthBase): - """ - Abstract class for header-based authenticators that set a key-value pair in outgoing HTTP headers - """ + """Abstract class for an header-based authenticators that add a header to outgoing HTTP requests.""" def __call__(self, request): """Attach the HTTP headers required to authenticate on the HTTP request""" @@ -19,7 +17,8 @@ def __call__(self, request): return request def get_auth_header(self) -> Mapping[str, Any]: - """HTTP header to set on the requests""" + """The header to set on outgoing HTTP requests""" + return {self.auth_header: self.token} @property @@ -30,4 +29,4 @@ def auth_header(self) -> str: @property @abstractmethod def token(self) -> str: - """Value of the HTTP header to set on the requests""" + """The header value to set on outgoing HTTP requests""" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/token.py index b5708f297919..5b15cd923f12 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/token.py @@ -6,7 +6,7 @@ from itertools import cycle from typing import List -from airbyte_cdk.sources.streams.http.requests_native_auth.abtract_token import AbstractHeaderAuthenticator +from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator class MultipleTokenAuthenticator(AbstractHeaderAuthenticator): diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 6960e3131c75..e538af4b91c1 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.67", + version="0.1.68", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_mapping.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_mapping.py deleted file mode 100644 index 6793031d146a..000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_mapping.py +++ /dev/null @@ -1,26 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping - - -def test(): - d = { - "field": "value", - "field_to_interpolate_from_config": "{{ config['c'] }}", - "field_to_interpolate_from_kwargs": "{{ kwargs['a'] }}", - "a_field": "{{ value_passed_directly }}", - } - config = {"c": "VALUE_FROM_CONFIG"} - kwargs = {"a": "VALUE_FROM_KWARGS"} - mapping = InterpolatedMapping(d) - - value_passed_directly = "ABC" - interpolated = mapping.eval(config, **{"kwargs": kwargs}, value_passed_directly=value_passed_directly) - - assert interpolated["field"] == "value" - assert interpolated["field_to_interpolate_from_config"] == "VALUE_FROM_CONFIG" - assert interpolated["field_to_interpolate_from_kwargs"] == "VALUE_FROM_KWARGS" - assert interpolated["a_field"] == value_passed_directly diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_string.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_string.py deleted file mode 100644 index 8207c253a742..000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_interpolated_string.py +++ /dev/null @@ -1,26 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString - -config = {"field": "value"} - - -def test_static_value(): - static_value = "HELLO WORLD" - s = InterpolatedString(static_value) - assert s.eval(config) == "HELLO WORLD" - - -def test_eval_from_config(): - string = "{{ config['field'] }}" - s = InterpolatedString(string) - assert s.eval(config) == "value" - - -def test_eval_from_kwargs(): - string = "{{ kwargs['c'] }}" - kwargs = {"c": "airbyte"} - s = InterpolatedString(string) - assert s.eval(config, **{"kwargs": kwargs}) == "airbyte" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_jello.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_jello.py index 7ea35147f7f5..6812e55be11e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_jello.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_jello.py @@ -10,7 +10,6 @@ from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor config = {"field": "record_array"} -kwargs = {"data_field": "records"} decoder = JsonDecoder() @@ -20,7 +19,6 @@ [ ("test_extract_from_array", "_.data", {"data": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]), ("test_field_in_config", "_.{{ config['field'] }}", {"record_array": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]), - ("test_field_in_kwargs", "_.{{ kwargs['data_field'] }}", {"records": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]), ("test_default", "_{{kwargs['field']}}", [{"id": 1}, {"id": 2}], [{"id": 1}, {"id": 2}]), ( "test_remove_fields_from_records", @@ -40,7 +38,7 @@ ], ) def test(test_name, transform, body, expected_records): - extractor = JelloExtractor(transform, config, decoder, kwargs=kwargs) + extractor = JelloExtractor(transform, config, decoder) response = create_response(body) actual_records = extractor.extract_records(response) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_selector.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_selector.py index 9bdcd0711e5e..0367d7d34a18 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_selector.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_selector.py @@ -39,7 +39,7 @@ def test_record_filter(test_name, transform_template, filter_template, body, exp response = create_response(body) decoder = JsonDecoder() - extractor = JelloExtractor(transform=transform_template, decoder=decoder, config=config, kwargs={}) + extractor = JelloExtractor(transform=transform_template, decoder=decoder, config=config) if filter_template is None: record_filter = None else: diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_mapping.py b/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_mapping.py index c5e4d4e71d38..9413c79caaf8 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_mapping.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_mapping.py @@ -2,24 +2,34 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - +import pytest from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping -def test(): +@pytest.mark.parametrize( + "test_name, key, expected_value", + [ + ("test_field_value", "field", "value"), + ("test_number", "number", 100), + ("test_field_to_interpolate_from_config", "field_to_interpolate_from_config", "VALUE_FROM_CONFIG"), + ("test_field_to_interpolate_from_kwargs", "field_to_interpolate_from_kwargs", "VALUE_FROM_KWARGS"), + ("test_field_to_interpolate_from_options", "field_to_interpolate_from_options", "VALUE_FROM_OPTIONS"), + ("test_key_is_interpolated", "key", "VALUE"), + ], +) +def test(test_name, key, expected_value): d = { "field": "value", "number": 100, "field_to_interpolate_from_config": "{{ config['c'] }}", "field_to_interpolate_from_kwargs": "{{ kwargs['a'] }}", + "field_to_interpolate_from_options": "{{ options['b'] }}", + "{{ options.k }}": "VALUE", } config = {"c": "VALUE_FROM_CONFIG"} kwargs = {"a": "VALUE_FROM_KWARGS"} - mapping = InterpolatedMapping(d) + mapping = InterpolatedMapping(d, options={"b": "VALUE_FROM_OPTIONS", "k": "key"}) interpolated = mapping.eval(config, **{"kwargs": kwargs}) - assert interpolated["field"] == "value" - assert interpolated["number"] == 100 - assert interpolated["field_to_interpolate_from_config"] == "VALUE_FROM_CONFIG" - assert interpolated["field_to_interpolate_from_kwargs"] == "VALUE_FROM_KWARGS" + assert interpolated[key] == expected_value diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_string.py b/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_string.py index 8207c253a742..b66d13ccc965 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_string.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/interpolation/test_interpolated_string.py @@ -2,25 +2,23 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import pytest from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString config = {"field": "value"} - - -def test_static_value(): - static_value = "HELLO WORLD" - s = InterpolatedString(static_value) - assert s.eval(config) == "HELLO WORLD" - - -def test_eval_from_config(): - string = "{{ config['field'] }}" - s = InterpolatedString(string) - assert s.eval(config) == "value" - - -def test_eval_from_kwargs(): - string = "{{ kwargs['c'] }}" - kwargs = {"c": "airbyte"} - s = InterpolatedString(string) - assert s.eval(config, **{"kwargs": kwargs}) == "airbyte" +options = {"hello": "world"} +kwargs = {"c": "airbyte"} + + +@pytest.mark.parametrize( + "test_name, input_string, expected_value", + [ + ("test_static_value", "HELLO WORLD", "HELLO WORLD"), + ("test_eval_from_options", "{{ options['hello'] }}", "world"), + ("test_eval_from_config", "{{ config['field'] }}", "value"), + ("test_eval_from_kwargs", "{{ kwargs['c'] }}", "airbyte"), + ], +) +def test_interpolated_string(test_name, input_string, expected_value): + s = InterpolatedString.create(input_string, options=options) + assert s.eval(config, **{"kwargs": kwargs}) == expected_value diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_string.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_string.py deleted file mode 100644 index 498da800c58c..000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_string.py +++ /dev/null @@ -1,54 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString - -config = {"start": 1234} -kwargs = {"next_page_token": {"next_page_url": "https://airbyte.io"}} - - -def test_value_is_static(): - static_value = "a_static_value" - interpolated_string = InterpolatedString(static_value) - - evaluated_string = interpolated_string.eval(config, **kwargs) - - assert evaluated_string == static_value - - -def test_value_from_config(): - string = "{{ config['start'] }}" - interpolated_string = InterpolatedString(string) - - evaluated_string = interpolated_string.eval(config, **kwargs) - - assert evaluated_string == config["start"] - - -def test_value_from_kwargs(): - string = "{{ next_page_token['next_page_url'] }}" - interpolated_string = InterpolatedString(string) - - evaluated_string = interpolated_string.eval(config, **kwargs) - - assert evaluated_string == "https://airbyte.io" - - -def test_default_value(): - static_value = "{{ config['end'] }}" - default = 5678 - interpolated_string = InterpolatedString(static_value, default) - - evaluated_string = interpolated_string.eval(config, **kwargs) - - assert evaluated_string == default - - -def test_interpolated_default_value(): - static_value = "{{ config['end'] }}" - interpolated_string = InterpolatedString(static_value, "{{ config['start'] }}") - - evaluated_string = interpolated_string.eval(config, **kwargs) - - assert evaluated_string == config["start"] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py index 35dbdd11a2c2..0b5aabda8ca8 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -41,8 +41,8 @@ def test_http_requester(): requester = HttpRequester( name=name, - url_base=InterpolatedString("{{ config['url'] }}"), - path=InterpolatedString("v1/{{ stream_slice['id'] }}"), + url_base=InterpolatedString.create("{{ config['url'] }}", options={}), + path=InterpolatedString.create("v1/{{ stream_slice['id'] }}", options={}), http_method=http_method, request_options_provider=request_options_provider, authenticator=authenticator, diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py index 00babbb382c8..e38ba23f7226 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py @@ -43,7 +43,7 @@ MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d"), MinMaxDatetime(datetime="2021-01-03", datetime_format="%Y-%m-%d"), "1d", - InterpolatedString(""), + InterpolatedString.create("", options={}), "%Y-%m-%d", None, ), diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_create_partial.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_create_partial.py index ad9d82c7ac43..cb239d0eca17 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_create_partial.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_create_partial.py @@ -50,4 +50,4 @@ def test_string_interpolation_through_kwargs(): options = {"name": "airbyte"} partial = create(InterpolatedString, string=s, options=options) interpolated_string = partial() - assert interpolated_string._string == "airbyte" + assert interpolated_string.eval({}) == "airbyte" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index a16eeab45d5e..20a2cf248812 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -122,7 +122,7 @@ def test_create_substream_slicer(): transform: "_" stream_A: type: DeclarativeStream - options: + $options: name: "A" primary_key: "id" retriever: "*ref(retriever)" @@ -130,7 +130,7 @@ def test_create_substream_slicer(): schema_loader: "*ref(schema_loader)" stream_B: type: DeclarativeStream - options: + $options: name: "B" primary_key: "id" retriever: "*ref(retriever)" @@ -197,7 +197,7 @@ def test_datetime_stream_slicer(): content = """ stream_slicer: type: DatetimeStreamSlicer - options: + $options: datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" start_datetime: type: MinMaxDatetime @@ -282,11 +282,11 @@ def test_full_config(): class_name: "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" schema_loader: class_name: airbyte_cdk.sources.declarative.schema.json_schema.JsonSchema - file_path: "./source_sendgrid/schemas/{{ name }}.json" + file_path: "./source_sendgrid/schemas/{{ options.name }}.json" cursor_field: [ ] list_stream: $ref: "*ref(partial_stream)" - options: + $options: name: "lists" primary_key: "id" extractor: @@ -366,12 +366,12 @@ def test_create_requester(): requester: type: HttpRequester path: "/v3/marketing/lists" - options: + $options: name: lists url_base: "https://api.sendgrid.com" authenticator: type: "BasicHttpAuthenticator" - username: "{{ config.apikey }}" + username: "{{ options.name }}" password: "{{ config.apikey }}" request_options_provider: request_parameters: @@ -386,7 +386,7 @@ def test_create_requester(): assert component._path._string == "/v3/marketing/lists" assert component._url_base._string == "https://api.sendgrid.com" assert isinstance(component._authenticator, BasicHttpAuthenticator) - assert component._authenticator._username.eval(input_config) == "verysecrettoken" + assert component._authenticator._username.eval(input_config) == "lists" assert component._authenticator._password.eval(input_config) == "verysecrettoken" assert component._method == HttpMethod.GET assert component._request_options_provider._parameter_interpolator._interpolator._mapping["page_size"] == 10 @@ -420,12 +420,12 @@ def test_config_with_defaults(): content = """ lists_stream: type: "DeclarativeStream" - options: + $options: name: "lists" primary_key: id url_base: "https://api.sendgrid.com" schema_loader: - file_path: "./source_sendgrid/schemas/{{name}}.yaml" + file_path: "./source_sendgrid/schemas/{{options.name}}.yaml" retriever: paginator: type: "LimitPaginator" @@ -518,7 +518,7 @@ def test_no_transformations(self): content = f""" the_stream: type: DeclarativeStream - options: + $options: {self.base_options} """ config = parser.parse(content) @@ -530,7 +530,7 @@ def test_remove_fields(self): content = f""" the_stream: type: DeclarativeStream - options: + $options: {self.base_options} transformations: - type: RemoveFields @@ -548,7 +548,7 @@ def test_add_fields(self): content = f""" the_stream: class_name: airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream - options: + $options: {self.base_options} transformations: - type: AddFields diff --git a/airbyte-integrations/connector-templates/source-configuration-based/setup.py.hbs b/airbyte-integrations/connector-templates/source-configuration-based/setup.py.hbs index 8d3351fd1653..de23ef7a33a6 100644 --- a/airbyte-integrations/connector-templates/source-configuration-based/setup.py.hbs +++ b/airbyte-integrations/connector-templates/source-configuration-based/setup.py.hbs @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1.65", + "airbyte-cdk~=0.1.68", ] TEST_REQUIREMENTS = [ diff --git a/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs b/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs index d27f0570ba79..a067a335b73f 100644 --- a/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs +++ b/airbyte-integrations/connector-templates/source-configuration-based/source_{{snakeCase name}}/{{snakeCase name}}.yaml.hbs @@ -26,7 +26,7 @@ retriever: class_name: airbyte_cdk.sources.declarative.states.dict_state.DictState customers_stream: type: DeclarativeStream - options: + $options: name: "customers" primary_key: "id" schema_loader: