diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py index 37e69c421e40..8d85ff7196eb 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py @@ -14,9 +14,9 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester -from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator -from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator -from airbyte_cdk.sources.declarative.requesters.paginators.offset_paginator import OffsetPaginator +from airbyte_cdk.sources.declarative.requesters.paginators.limit_paginator import LimitPaginator +from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import CursorPaginationStrategy +from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import OffsetIncrement from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer @@ -29,17 +29,17 @@ "CartesianProductStreamSlicer": CartesianProductStreamSlicer, "CompositeErrorHandler": CompositeErrorHandler, "ConstantBackoffStrategy": ConstantBackoffStrategy, + "CursorPagination": CursorPaginationStrategy, "DatetimeStreamSlicer": DatetimeStreamSlicer, "DeclarativeStream": DeclarativeStream, "DefaultErrorHandler": DefaultErrorHandler, "ExponentialBackoffStrategy": ExponentialBackoffStrategy, "HttpRequester": HttpRequester, - "InterpolatedPaginator": InterpolatedPaginator, "JelloExtractor": JelloExtractor, + "LimitPaginator": LimitPaginator, "ListStreamSlicer": ListStreamSlicer, "MinMaxDatetime": MinMaxDatetime, - "NextPageUrlPaginator": NextPageUrlPaginator, - "OffsetPaginator": OffsetPaginator, + "OffsetIncrement": OffsetIncrement, "RemoveFields": RemoveFields, "TokenAuthenticator": TokenAuthenticator, } diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py index a66f937820a5..00e2a025c323 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py @@ -12,11 +12,13 @@ from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector +from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester +from airbyte_cdk.sources.declarative.requesters.paginators.limit_paginator import RequestOption from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( @@ -35,19 +37,22 @@ from airbyte_cdk.sources.streams.core import Stream DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = { + ConnectionChecker: CheckStream, + Decoder: JsonDecoder, + ErrorHandler: DefaultErrorHandler, + HttpResponseFilter: HttpResponseFilter, + HttpSelector: RecordSelector, + InterpolatedBoolean: InterpolatedBoolean, + InterpolatedRequestOptionsProvider: InterpolatedRequestOptionsProvider, + InterpolatedString: InterpolatedString, + MinMaxDatetime: MinMaxDatetime, + Paginator: NoPagination, + RequestOption: RequestOption, + RequestOptionsProvider: InterpolatedRequestOptionsProvider, Requester: HttpRequester, Retriever: SimpleRetriever, SchemaLoader: JsonSchema, - HttpSelector: RecordSelector, - ConnectionChecker: CheckStream, - ErrorHandler: DefaultErrorHandler, - Decoder: JsonDecoder, State: DictState, - StreamSlicer: SingleSlice, - RequestOptionsProvider: InterpolatedRequestOptionsProvider, - Paginator: NoPagination, - HttpResponseFilter: HttpResponseFilter, Stream: DeclarativeStream, - MinMaxDatetime: MinMaxDatetime, - InterpolatedString: InterpolatedString, + StreamSlicer: SingleSlice, } diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py index cf8063fba5c4..8428a3d50cda 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/interpolated_request_input_provider.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, Mapping, Union +from typing import Any, Mapping from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString @@ -26,7 +26,7 @@ def __init__(self, *, config, request_inputs=None): def request_inputs( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> Union[Mapping, str]: + ) -> Mapping[str, Any]: kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token} interpolated_value = self._interpolator.eval(self._config, **kwargs) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/conditional_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/conditional_paginator.py deleted file mode 100644 index e7c6254be15d..000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/conditional_paginator.py +++ /dev/null @@ -1,41 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from typing import Any, List, Mapping, Optional - -import requests -from airbyte_cdk.sources.declarative.decoders.decoder import Decoder -from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean -from airbyte_cdk.sources.declarative.states.dict_state import DictState - - -class ConditionalPaginator: - """ - A paginator that performs pagination by incrementing a page number and stops based on a provided stop condition. - """ - - def __init__(self, stop_condition: str, state: DictState, decoder: Decoder, config): - self._stop_condition_interpolator = InterpolatedBoolean(stop_condition) - self._state: DictState = state - self._decoder = decoder - self._config = config - - def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: - decoded_response = self._decoder.decode(response) - headers = response.headers - should_stop = self._stop_condition_interpolator.eval( - self._config, decoded_response=decoded_response, headers=headers, last_records=last_records - ) - - if should_stop: - return None - next_page = self._get_page() + 1 - self._update_page_state(next_page) - return {"page": next_page} - - def _get_page(self): - return self._state.get_state("page") - - def _update_page_state(self, page): - self._state.update_state(**{"page": page}) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/interpolated_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/interpolated_paginator.py deleted file mode 100644 index 55a0a9cc9bfe..000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/interpolated_paginator.py +++ /dev/null @@ -1,31 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from typing import Any, List, Mapping, Optional - -import requests -from airbyte_cdk.sources.declarative.decoders.decoder import Decoder -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping -from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation -from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator -from airbyte_cdk.sources.declarative.types import Config - - -class InterpolatedPaginator(Paginator): - def __init__(self, *, next_page_token_template: Mapping[str, str], config: Config, decoder: Optional[Decoder] = None): - self._next_page_token_template = InterpolatedMapping(next_page_token_template, JinjaInterpolation()) - self._decoder = decoder or JsonDecoder() - self._config = config - - def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: - decoded_response = self._decoder.decode(response) - headers = response.headers - interpolated_values = self._next_page_token_template.eval( - self._config, decoded_response=decoded_response, headers=headers, last_records=last_records - ) - - non_null_tokens = {k: v for k, v in interpolated_values.items() if v is not None} - - return non_null_tokens if non_null_tokens else None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py new file mode 100644 index 000000000000..4603103a41e6 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py @@ -0,0 +1,139 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, List, Mapping, Optional + +import requests +from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy +from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType +from airbyte_cdk.sources.declarative.types import Config + + +class LimitPaginator(Paginator): + """ + Limit paginator. + Requests pages of results with a fixed size until the pagination strategy no longer returns a next_page_token + + Examples: + 1. + * fetches up to 10 records at a time by setting the "limit" request param to 10 + * updates the request path with "{{ decoded_response._metadata.next }}" + paginator: + type: "LimitPaginator" + limit_value: 10 + limit_option: + option_type: request_parameter + field_name: page_size + page_token_option: + option_type: path + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ decoded_response._metadata.next }}" + ` + + 2. + * fetches up to 5 records at a time by setting the "page_size" header to 5 + * increments a record counter and set the request parameter "offset" to the value of the counter + ` + paginator: + type: "LimitPaginator" + limit_value: 5 + limit_option: + option_type: header + field_name: page_size + pagination_strategy: + type: "OffsetIncrement" + page_token: + option_type: "request_parameter" + field_name: "offset" + ` + + 3. + * fetches up to 5 records at a time by setting the "page_size" request param to 5 + * increments a page counter and set the request parameter "page" to the value of the counter + ` + paginator: + type: "LimitPaginator" + limit_value: 5 + limit_option: + option_type: request_parameter + field_name: page_size + pagination_strategy: + type: "PageIncrement" + page_token: + option_type: "request_parameter" + field_name: "page" + """ + + def __init__( + self, + page_size: int, + limit_option: RequestOption, + page_token_option: RequestOption, + pagination_strategy: PaginationStrategy, + config: Config, + url_base: str, + decoder: Decoder = None, + ): + """ + :param page_size: the number of records to request + :param limit_option: the request option to set the limit. Cannot be injected in the path. + :param page_token_option: the request option to set the page token + :param pagination_strategy: Strategy defining how to get the next page token + :param config: connection config + :param url_base: endpoint's base url + :param decoder: decoder to decode the response + """ + if limit_option.inject_into == RequestOptionType.path: + raise ValueError("Limit parameter cannot be a path") + self._page_size = page_size + self._config = config + self._limit_option = limit_option + self._page_token_option = page_token_option + self._pagination_strategy = pagination_strategy + self._token = None + if isinstance(url_base, str): + url_base = InterpolatedString(url_base) + self._url_base = url_base + self._decoder = decoder or JsonDecoder() + + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: + self._token = self._pagination_strategy.next_page_token(response, last_records) + if self._token: + return {"next_page_token": self._token} + else: + return None + + def path(self): + if self._token and self._page_token_option.inject_into == RequestOptionType.path: + # Replace url base to only return the path + return str(self._token).replace(self._url_base.eval(self._config), "") + else: + return None + + def request_params(self) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.request_parameter) + + def request_headers(self) -> Mapping[str, str]: + return self._get_request_options(RequestOptionType.header) + + def request_body_data(self) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.body_data) + + def request_body_json(self) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.body_json) + + def _get_request_options(self, option_type) -> Mapping[str, Any]: + options = {} + if self._page_token_option.inject_into == option_type: + if option_type != RequestOptionType.path and self._token: + options[self._page_token_option.field_name] = self._token + if self._limit_option.inject_into == option_type: + if option_type != RequestOptionType.path: + options[self._limit_option.field_name] = self._page_size + return options diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py deleted file mode 100644 index 89f76eb34e04..000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/next_page_url_paginator.py +++ /dev/null @@ -1,39 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from typing import Any, List, Mapping, Optional - -import requests -from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator -from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator -from airbyte_cdk.sources.declarative.types import Config - - -class NextPageUrlPaginator(Paginator): - """ - A paginator wrapper that delegates to an inner paginator and removes the base url from the next_page_token to only return the path to the next page - """ - - def __init__( - self, - url_base: str = None, - next_page_token_template: Optional[Mapping[str, str]] = None, - config: Optional[Config] = None, - ): - """ - :param url_base: url base to remove from the token - :param interpolated_paginator: optional paginator to delegate to - :param next_page_token_template: optional mapping to delegate to if interpolated_paginator is None - :param config: connection config - """ - - self._url_base = url_base - self._interpolated_paginator = InterpolatedPaginator(next_page_token_template=next_page_token_template, config=config) - - def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: - next_page_token = self._interpolated_paginator.next_page_token(response, last_records) - if next_page_token: - return {k: v.replace(self._url_base, "") for k, v in next_page_token.items() if v} - else: - return None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index dafe5b42ba9d..ed593f2582dd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -2,12 +2,27 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, List, Mapping, Optional +from typing import Any, List, Mapping, Optional, Union import requests from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator class NoPagination(Paginator): - def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: + def path(self) -> Optional[str]: return None + + def request_params(self) -> Mapping[str, Any]: + return {} + + def request_headers(self) -> Mapping[str, str]: + return {} + + def request_body_data(self) -> Union[Mapping[str, Any], str]: + return {} + + def request_body_json(self) -> Mapping[str, Any]: + return {} + + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]: + return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py deleted file mode 100644 index 11ade15b8f33..000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py +++ /dev/null @@ -1,31 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from typing import Any, List, Mapping, Optional - -import requests -from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator -from airbyte_cdk.sources.declarative.states.dict_state import DictState - - -class OffsetPaginator(Paginator): - def __init__(self, page_size: int, state: Optional[DictState] = None, offset_key: str = "offset"): - self._limit = page_size - self._state = state or DictState() - self._offsetKey = offset_key - self._update_state_with_offset(0) - - def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: - if not last_records or len(last_records) < self._limit: - return None - offset = self._get_offset() + self._limit - token_map = {self._offsetKey: offset} - self._update_state_with_offset(offset) - return token_map - - def _update_state_with_offset(self, offset): - self._state.update_state(**{self._offsetKey: offset}) - - def _get_offset(self): - return self._state.get_state(self._offsetKey) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/pagination_strategy.py new file mode 100644 index 000000000000..fcd987dbc247 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/pagination_strategy.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from abc import abstractmethod +from typing import Any, List, Mapping, Optional + +import requests + + +class PaginationStrategy: + """ + Defines how to get the next page token + """ + + @abstractmethod + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: + """ + + :param response: response to process + :param last_records: records extracted from the response + :return: next page token. Returns None if there are no more pages to fetch + """ + pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index 7bd42c365a98..50d862f50289 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -11,4 +11,49 @@ class Paginator(ABC): @abstractmethod def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: + """ + + :param response: the response to process + :param last_records: the records extracted from the response + :return: A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response. + """ + pass + + @abstractmethod + def path(self) -> Optional[str]: + """ + :return: path to hit to fetch the next request. Returning None means the path does not need to be updated + """ + pass + + @abstractmethod + def request_params(self) -> Mapping[str, Any]: + """ + + :return: the request parameters to set to fetch the next page + """ + pass + + @abstractmethod + def request_headers(self) -> Mapping[str, str]: + """ + + :return: the request headers to set to fetch the next page + """ + pass + + @abstractmethod + def request_body_data(self) -> Mapping[str, Any]: + """ + + :return: the request body data to set to fetch the next page + """ + pass + + @abstractmethod + def request_body_json(self) -> Mapping[str, Any]: + """ + + :return: the request body to set (as a json object) to fetch the next page + """ pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/__init__.py new file mode 100644 index 000000000000..1100c1c58cf5 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py new file mode 100644 index 000000000000..a3089940f906 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -0,0 +1,52 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, List, Mapping, Optional, Union + +import requests +from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder +from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy +from airbyte_cdk.sources.declarative.types import Config + + +class CursorPaginationStrategy(PaginationStrategy): + """ + Pagination strategy that evaluates an interpolated string to define the next page token + """ + + def __init__( + self, + cursor_value: Union[InterpolatedString, str], + config: Config, + stop_condition: Optional[InterpolatedBoolean] = None, + decoder: Decoder = None, + ): + """ + + :param cursor_value: template string evaluating to the cursor value + :param config: connection config + :param stop_condition: template string evaluating when to stop paginating + :param decoder: decoder to decode the response + """ + if isinstance(cursor_value, str): + cursor_value = InterpolatedString(cursor_value) + self._cursor_value = cursor_value + self._config = config + self._decoder = decoder or JsonDecoder() + self._stop_condition = stop_condition + + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: + decoded_response = self._decoder.decode(response) + headers = response.headers + if self._stop_condition: + should_stop = self._stop_condition.eval( + self._config, decoded_response=decoded_response, headers=headers, last_records=last_records + ) + if should_stop: + return None + token = self._cursor_value.eval(config=self._config, last_records=last_records, decoded_response=self._decoder.decode(response)) + return token if token else None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py new file mode 100644 index 000000000000..2894652dcfae --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, List, Mapping, Optional + +import requests +from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy + + +class OffsetIncrement(PaginationStrategy): + """ + Pagination strategy that returns the number of records reads so far and returns it as the next page token + """ + + def __init__(self, page_size: int): + self._offset = 0 + self._page_size = page_size + + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: + if len(last_records) < self._page_size: + return None + else: + self._offset += len(last_records) + return self._offset diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py new file mode 100644 index 000000000000..b17c22ae7497 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, List, Mapping, Optional + +import requests +from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy + + +class PageIncrement(PaginationStrategy): + """ + Pagination strategy that returns the number of pages reads so far and returns it as the next page token + """ + + def __init__(self, page_size: int): + self._page_size = page_size + self._offset = 0 + + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: + if len(last_records) < self._page_size: + return None + else: + self._offset += 1 + return self._offset diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_option.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_option.py new file mode 100644 index 000000000000..fbcd1bb47798 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_option.py @@ -0,0 +1,48 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from enum import Enum +from typing import Optional + + +class RequestOptionType(Enum): + """ + Describes where to set a value on a request + """ + + request_parameter = "request_parameter" + header = "header" + path = "path" + body_data = "body_data" + body_json = "body_json" + + +class RequestOption: + """ + Describes an option to set on a request + """ + + def __init__(self, inject_into: RequestOptionType, field_name: Optional[str] = None): + """ + :param inject_into: where to set the value + :param field_name: field name to set. None if option_type == path. Required otherwise. + """ + self._option_type = inject_into + self._field_name = field_name + if self._option_type == RequestOptionType.path: + if self._field_name is not None: + raise ValueError(f"RequestOption with path cannot have a field name. Get {field_name}") + elif self._field_name is None: + raise ValueError(f"RequestOption expected field name for type {self._option_type}") + + @property + def inject_into(self) -> RequestOptionType: + return self._option_type + + @property + def field_name(self) -> Optional[str]: + return self._field_name + + def is_path(self): + return self._option_type == RequestOptionType.path diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index 1fcb5fe58890..521f33334491 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -15,7 +15,7 @@ def __init__(self, *, config, request_parameters=None, request_headers=None, req if request_headers is None: request_headers = {} if request_body_data is None: - request_body_data = "" + request_body_data = {} if request_body_json is None: request_body_json = {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index ea4332214d3e..ce46fa4be270 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -97,7 +97,25 @@ def request_headers( Authentication headers will overwrite any overlapping headers returned from this method. """ # Warning: use self.state instead of the stream_state passed as argument! - return self._requester.request_headers(self.state, stream_slice, next_page_token) + return self._get_request_options(stream_slice, next_page_token, self._requester.request_headers, self._paginator.request_headers) + + def _get_request_options(self, stream_slice: Mapping[str, Any], next_page_token: Mapping[str, Any], requester_method, paginator_method): + """ + Get the request_option from the requester and from the paginator + Raise a ValueError if there's a key collision + Returned merged mapping otherwise + :param stream_slice: + :param next_page_token: + :param requester_method: + :param paginator_method: + :return: + """ + requester_mapping = requester_method(self.state, stream_slice, next_page_token) + paginator_mapping = paginator_method() + keys_intersection = set(requester_mapping.keys()) & set(paginator_mapping.keys()) + if keys_intersection: + raise ValueError(f"Duplicate keys found: {keys_intersection}") + return {**requester_mapping, **paginator_mapping} def request_body_data( self, @@ -115,7 +133,18 @@ def request_body_data( At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. """ # Warning: use self.state instead of the stream_state passed as argument! - return self._requester.request_body_data(self.state, stream_slice, next_page_token) + base_body_data = self._requester.request_body_data(self.state, stream_slice, next_page_token) + if isinstance(base_body_data, str): + paginator_body_data = self._paginator.request_body_data() + if paginator_body_data: + raise ValueError( + f"Cannot combine requester's body data= {base_body_data} with paginator's body_data: {paginator_body_data}" + ) + else: + return base_body_data + return self._get_request_options( + stream_slice, next_page_token, self._requester.request_body_data, self._paginator.request_body_data + ) def request_body_json( self, @@ -129,7 +158,9 @@ def request_body_json( At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. """ # Warning: use self.state instead of the stream_state passed as argument! - return self._requester.request_body_json(self.state, stream_slice, next_page_token) + return self._get_request_options( + stream_slice, next_page_token, self._requester.request_body_json, self._paginator.request_body_json + ) def request_kwargs( self, @@ -148,7 +179,20 @@ def request_kwargs( def path( self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: - return self._requester.get_path(stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token) + """ + Return the path the submit the next request to. + If the paginator points to a path, follow it, else return the requester's path + :param stream_state: + :param stream_slice: + :param next_page_token: + :return: + """ + # Warning: use self.state instead of the stream_state passed as argument! + paginator_path = self._paginator.path() + if paginator_path: + return paginator_path + else: + return self._requester.get_path(stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token) def request_params( self, @@ -162,7 +206,7 @@ def request_params( E.g: you might want to define query parameters for paging if next_page_token is not None. """ # Warning: use self.state instead of the stream_state passed as argument! - return self._requester.request_params(self.state, stream_slice, next_page_token) + return self._get_request_options(stream_slice, next_page_token, self._requester.request_params, self._paginator.request_params) @property def cache_filename(self): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_conditional_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_conditional_paginator.py deleted file mode 100644 index 0ee5aee91345..000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_conditional_paginator.py +++ /dev/null @@ -1,41 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import json - -import pytest -import requests -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.requesters.paginators.conditional_paginator import ConditionalPaginator -from airbyte_cdk.sources.declarative.states.dict_state import DictState - - -@pytest.mark.parametrize( - "test_name, stop_condition_template, expected_next_page_token, expected_page", - [ - ("test_stop_pagination_from_response_body", "{{ not decoded_response['accounts'] }}", None, 1), - ("test_stop_pagination_from_config", "{{ config['response_override'] == decoded_response['_metadata']['content'] }}", None, 1), - ("test_continue_pagination_from_response_body", "{{ decoded_response['end'] == decoded_response['total'] - 1 }}", {"page": 2}, 2), - ("test_continue_pagination_from_response_headers", "{{ decoded_response['headers']['has_more'] }}", {"page": 2}, 2), - ("test_continue_pagination_from_last_records", "{{ last_records[-1]['more_records'] == False }}", {"page": 2}, 2), - ("test_continue_pagination_for_empty_dict_evaluates_false", "{{ decoded_response['characters'] }}", {"page": 2}, 2), - ], -) -def test_interpolated_request_header(test_name, stop_condition_template, expected_next_page_token, expected_page): - state = DictState() - state.update_state(page=1) - decoder = JsonDecoder() - config = {"response_override": "stop_if_you_see_me"} - - response = requests.Response() - response.headers = {"has_more": True} - response_body = {"_metadata": {"content": "stop_if_you_see_me"}, "accounts": [], "end": 99, "total": 200, "characters": {}} - response._content = json.dumps(response_body).encode("utf-8") - last_records = [{"id": 0, "more_records": True}, {"id": 1, "more_records": True}] - - paginator = ConditionalPaginator(stop_condition_template, state, decoder, config) - next_page_token = paginator.next_page_token(response, last_records) - - assert next_page_token == expected_next_page_token - assert state.get_state("page") == expected_page diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py new file mode 100644 index 000000000000..507e2b3391ca --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py @@ -0,0 +1,38 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json + +import pytest +import requests +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder +from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean +from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import CursorPaginationStrategy + + +@pytest.mark.parametrize( + "test_name, template_string, stop_condition, expected_token", + [ + ("test_static_token", "token", None, "token"), + ("test_token_from_config", "{{ config.config_key }}", None, "config_value"), + ("test_token_from_last_record", "{{ last_records[-1].id }}", None, 1), + ("test_token_from_decoded_response", "{{ decoded_response._metadata.content }}", None, "content_value"), + ("test_token_not_found", "{{ decoded_response.invalid_key }}", None, None), + ("test_static_token_with_stop_condition_false", "token", InterpolatedBoolean("{{False}}"), "token"), + ("test_static_token_with_stop_condition_true", "token", InterpolatedBoolean("{{True}}"), None), + ], +) +def test_cursor_pagination_strategy(test_name, template_string, stop_condition, expected_token): + decoder = JsonDecoder() + config = {"config_key": "config_value"} + strategy = CursorPaginationStrategy(template_string, config, stop_condition, decoder) + + response = requests.Response() + response.headers = {"has_more": True} + response_body = {"_metadata": {"content": "content_value"}, "accounts": [], "end": 99, "total": 200, "characters": {}} + response._content = json.dumps(response_body).encode("utf-8") + last_records = [{"id": 0, "more_records": True}, {"id": 1, "more_records": True}] + + token = strategy.next_page_token(response, last_records) + assert expected_token == token diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_paginator.py deleted file mode 100644 index 98b3e481daca..000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_interpolated_paginator.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import json - -import pytest -import requests -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator - -config = {"option": "OPTION"} - -response = requests.Response() -response.headers = {"A_HEADER": "HEADER_VALUE"} -response_body = {"next_page_cursor": 12345} -response._content = json.dumps(response_body).encode("utf-8") -last_responses = [{"id": 0}] -decoder = JsonDecoder() - - -@pytest.mark.parametrize( - "test_name, next_page_token_template, expected_next_page_token", - [ - ("test_value_is_static", {"cursor": "a_static_value"}, {"cursor": "a_static_value"}), - ( - "test_value_depends_response_body", - {"cursor": "{{ decoded_response['next_page_cursor'] }}"}, - {"cursor": response_body["next_page_cursor"]}, - ), - ("test_value_depends_response_header", {"cursor": "{{ headers['A_HEADER'] }}"}, {"cursor": response.headers["A_HEADER"]}), - ("test_value_depends_on_last_responses", {"cursor": "{{ last_records[-1]['id'] }}"}, {"cursor": 0}), - ( - "test_name_is_interpolated", - {"{{ decoded_response['next_page_cursor'] }}": "a_static_value"}, - {response_body["next_page_cursor"]: "a_static_value"}, - ), - ("test_token_is_none_if_field_not_found", {"cursor": "{{ decoded_response['not_next_page_cursor'] }}"}, None), - ], -) -def test_interpolated_paginator(test_name, next_page_token_template, expected_next_page_token): - paginator = InterpolatedPaginator(next_page_token_template=next_page_token_template, decoder=decoder, config=config) - - actual_next_page_token = paginator.next_page_token(response, last_responses) - - assert expected_next_page_token == actual_next_page_token diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_limit_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_limit_paginator.py new file mode 100644 index 000000000000..3323db48337a --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_limit_paginator.py @@ -0,0 +1,141 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json + +import pytest +import requests +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder +from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean +from airbyte_cdk.sources.declarative.requesters.paginators.limit_paginator import LimitPaginator, RequestOption, RequestOptionType +from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import CursorPaginationStrategy + + +@pytest.mark.parametrize( + "test_name, page_token_request_option, stop_condition, expected_updated_path, expected_request_params, expected_headers, expected_body_data, expected_body_json, last_records, expected_next_page_token", + [ + ( + "test_limit_paginator_path", + RequestOption(inject_into=RequestOptionType.path), + None, + "/next_url", + {"limit": 2}, + {}, + {}, + {}, + [{"id": 0}, {"id": 1}], + {"next_page_token": "https://airbyte.io/next_url"}, + ), + ( + "test_limit_paginator_request_param", + RequestOption(inject_into=RequestOptionType.request_parameter, field_name="from"), + None, + None, + {"limit": 2, "from": "https://airbyte.io/next_url"}, + {}, + {}, + {}, + [{"id": 0}, {"id": 1}], + {"next_page_token": "https://airbyte.io/next_url"}, + ), + ( + "test_limit_paginator_no_token", + RequestOption(inject_into=RequestOptionType.request_parameter, field_name="from"), + InterpolatedBoolean("{{True}}"), + None, + {"limit": 2}, + {}, + {}, + {}, + [{"id": 0}, {"id": 1}], + None, + ), + ( + "test_limit_paginator_cursor_header", + RequestOption(inject_into=RequestOptionType.header, field_name="from"), + None, + None, + {"limit": 2}, + {"from": "https://airbyte.io/next_url"}, + {}, + {}, + [{"id": 0}, {"id": 1}], + {"next_page_token": "https://airbyte.io/next_url"}, + ), + ( + "test_limit_paginator_cursor_body_data", + RequestOption(inject_into=RequestOptionType.body_data, field_name="from"), + None, + None, + {"limit": 2}, + {}, + {"from": "https://airbyte.io/next_url"}, + {}, + [{"id": 0}, {"id": 1}], + {"next_page_token": "https://airbyte.io/next_url"}, + ), + ( + "test_limit_paginator_cursor_body_json", + RequestOption(inject_into=RequestOptionType.body_json, field_name="from"), + None, + None, + {"limit": 2}, + {}, + {}, + {"from": "https://airbyte.io/next_url"}, + [{"id": 0}, {"id": 1}], + {"next_page_token": "https://airbyte.io/next_url"}, + ), + ], +) +def test_limit_paginator( + test_name, + page_token_request_option, + stop_condition, + expected_updated_path, + expected_request_params, + expected_headers, + expected_body_data, + expected_body_json, + last_records, + expected_next_page_token, +): + limit_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="limit") + cursor_value = "{{ decoded_response.next }}" + url_base = "https://airbyte.io" + config = {} + strategy = CursorPaginationStrategy(cursor_value, stop_condition=stop_condition, decoder=JsonDecoder(), config=config) + paginator = LimitPaginator(2, limit_request_option, page_token_request_option, strategy, config, url_base) + + response = requests.Response() + response.headers = {"A_HEADER": "HEADER_VALUE"} + response_body = {"next": "https://airbyte.io/next_url"} + response._content = json.dumps(response_body).encode("utf-8") + + actual_next_page_token = paginator.next_page_token(response, last_records) + actual_next_path = paginator.path() + actual_request_params = paginator.request_params() + actual_headers = paginator.request_headers() + actual_body_data = paginator.request_body_data() + actual_body_json = paginator.request_body_json() + assert actual_next_page_token == expected_next_page_token + assert actual_next_path == expected_updated_path + assert actual_request_params == expected_request_params + assert actual_headers == expected_headers + assert actual_body_data == expected_body_data + assert actual_body_json == expected_body_json + + +def test_limit_cannot_be_set_in_path(): + limit_request_option = RequestOption(inject_into=RequestOptionType.path) + page_token_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="offset") + cursor_value = "{{ decoded_response.next }}" + url_base = "https://airbyte.io" + config = {} + strategy = CursorPaginationStrategy(cursor_value, config) + try: + LimitPaginator(2, limit_request_option, page_token_request_option, strategy, config, url_base) + assert False + except ValueError: + pass diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py deleted file mode 100644 index aba00d9c8439..000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_next_page_url_paginator.py +++ /dev/null @@ -1,41 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import json - -import requests -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator - -config = {"option": "OPTION"} -response = requests.Response() -response.headers = {"A_HEADER": "HEADER_VALUE"} -response_body = {"_metadata": {"next": "https://airbyte.io/next_url"}} -response._content = json.dumps(response_body).encode("utf-8") -last_responses = [{"id": 0}] -decoder = JsonDecoder() - - -def test_value_depends_response_body(): - next_page_tokens = {"next_page_url": "{{ decoded_response['_metadata']['next'] }}"} - paginator = create_paginator(next_page_tokens) - - next_page_token = paginator.next_page_token(response, last_responses) - - assert next_page_token == {"next_page_url": "next_url"} - - -def test_no_next_page_found(): - next_page_tokens = {"next_page_url": "{{ decoded_response['_metadata']['next'] }}"} - paginator = create_paginator(next_page_tokens) - - r = requests.Response() - r._content = json.dumps({"data": []}).encode("utf-8") - next_page_token = paginator.next_page_token(r, last_responses) - - assert next_page_token is None - - -def create_paginator(template): - return NextPageUrlPaginator("https://airbyte.io/", next_page_token_template=template, config=config) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py index 7b3f8c4c68dc..b9fcd7af21fc 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py @@ -9,4 +9,4 @@ def test(): paginator = NoPagination() next_page_token = paginator.next_page_token(requests.Response(), []) - assert next_page_token is None + assert next_page_token == {} diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py new file mode 100644 index 000000000000..866ae756427e --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py @@ -0,0 +1,32 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json + +import pytest +import requests +from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import OffsetIncrement + + +@pytest.mark.parametrize( + "test_name, page_size, expected_next_page_token, expected_offset", + [ + ("test_same_page_size", 2, 2, 2), + ("test_larger_page_size", 3, None, 0), + ], +) +def test_offset_increment_paginator_strategy(test_name, page_size, expected_next_page_token, expected_offset): + paginator_strategy = OffsetIncrement(page_size) + assert paginator_strategy._offset == 0 + + response = requests.Response() + + response.headers = {"A_HEADER": "HEADER_VALUE"} + response_body = {"next": "https://airbyte.io/next_url"} + response._content = json.dumps(response_body).encode("utf-8") + last_records = [{"id": 0}, {"id": 1}] + + next_page_token = paginator_strategy.next_page_token(response, last_records) + assert expected_next_page_token == next_page_token + assert expected_offset == paginator_strategy._offset diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_paginator.py deleted file mode 100644 index 9d4ecef26b7a..000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_paginator.py +++ /dev/null @@ -1,47 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import requests -from airbyte_cdk.sources.declarative.requesters.paginators.offset_paginator import OffsetPaginator -from airbyte_cdk.sources.declarative.states.dict_state import DictState - -response = requests.Response() - -tag = "cursor" -last_responses = [{"id": 0}, {"id": 1}] -state = DictState() - - -def test_return_none_if_fewer_records_than_limit(): - limit = 5 - paginator = OffsetPaginator(limit, state, tag) - - assert paginator._get_offset() == 0 - - next_page_token = paginator.next_page_token(response, last_responses) - - assert next_page_token is None - - -def test_return_next_offset_limit_1(): - limit = 1 - paginator = OffsetPaginator(limit, state, tag) - - next_page_token = paginator.next_page_token(response, last_responses) - - assert next_page_token == {tag: 1} - assert paginator._get_offset() == 1 - - -def test_return_next_offset_limit_2(): - limit = 2 - paginator = OffsetPaginator(limit, state, tag) - - next_page_token = paginator.next_page_token(response, last_responses) - - assert next_page_token == {tag: 2} - assert paginator._get_offset() == 2 - - next_page_token = paginator.next_page_token(response, [{"id": 2}]) - assert next_page_token is None diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py new file mode 100644 index 000000000000..7d50dfb105aa --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py @@ -0,0 +1,32 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json + +import pytest +import requests +from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement + + +@pytest.mark.parametrize( + "test_name, page_size, expected_next_page_token, expected_offset", + [ + ("test_same_page_size", 2, 1, 1), + ("test_larger_page_size", 3, None, 0), + ], +) +def test_page_increment_paginator_strategy(test_name, page_size, expected_next_page_token, expected_offset): + paginator_strategy = PageIncrement(page_size) + assert paginator_strategy._offset == 0 + + response = requests.Response() + + response.headers = {"A_HEADER": "HEADER_VALUE"} + response_body = {"next": "https://airbyte.io/next_url"} + response._content = json.dumps(response_body).encode("utf-8") + last_records = [{"id": 0}, {"id": 1}] + + next_page_token = paginator_strategy.next_page_token(response, last_records) + assert expected_next_page_token == next_page_token + assert expected_offset == paginator_strategy._offset diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_request_option.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_request_option.py new file mode 100644 index 000000000000..0ccedc6b4d14 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_request_option.py @@ -0,0 +1,33 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType + + +@pytest.mark.parametrize( + "test_name, option_type, field_name, should_raise", + [ + ("test_limit_path_no_field_name", RequestOptionType.path, None, False), + ("test_limit_path_with_field_name", RequestOptionType.path, "field", True), + ("test_limit_param_no_field_name", RequestOptionType.request_parameter, None, True), + ("test_limit_param_with_field_name", RequestOptionType.request_parameter, "field", False), + ("test_limit_header_no_field_name", RequestOptionType.header, None, True), + ("test_limit_header_with_field_name", RequestOptionType.header, "field", False), + ("test_limit_data_no_field_name", RequestOptionType.body_data, None, True), + ("test_limit_data_with_field_name", RequestOptionType.body_data, "field", False), + ("test_limit_json_no_field_name", RequestOptionType.body_json, None, True), + ("test_limit_json_with_field_name", RequestOptionType.body_json, "field", False), + ], +) +def test_request_option(test_name, option_type, field_name, should_raise): + try: + request_option = RequestOption(inject_into=option_type, field_name=field_name) + if should_raise: + assert False + assert request_option._field_name == field_name + assert request_option._option_type == option_type + except ValueError: + if not should_raise: + assert False diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py index 5f6a5f1abd25..d2cab9fc7e8f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py @@ -22,7 +22,6 @@ ("test_value_depends_on_next_page_token", {"read_from_token": "{{ next_page_token['offset'] }}"}, {"read_from_token": 12345}), ("test_value_depends_on_config", {"read_from_config": "{{ config['option'] }}"}, {"read_from_config": "OPTION"}), ("test_none_value", {"missing_param": "{{ fake_path['date'] }}"}, {}), - ("test_return_empty_dict_for_string_templates", "Should return empty dict {{ stream_state['date'] }}", {}), ( "test_parameter_is_interpolated", {"{{ stream_state['date'] }} - {{stream_slice['start_date']}} - {{next_page_token['offset']}} - {{config['option']}}": "ABC"}, @@ -66,12 +65,9 @@ def test_interpolated_request_json(test_name, input_request_json, expected_reque "test_name, input_request_data, expected_request_data", [ ("test_static_map_data", {"a_static_request_param": "a_static_value"}, {"a_static_request_param": "a_static_value"}), - ("test_static_string_data", "a_static_value", "a_static_value"), - ("test_string_depends_on_state", "key={{ stream_state['date'] }}", "key=2021-01-01"), ("test_map_depends_on_stream_slice", {"read_from_slice": "{{ stream_slice['start_date'] }}"}, {"read_from_slice": "2020-01-01"}), - ("test_string_depends_on_next_page_token", "{{ next_page_token['page'] }} and {{ next_page_token['offset'] }}", "27 and 12345"), ("test_map_depends_on_config", {"read_from_config": "{{ config['option'] }}"}, {"read_from_config": "OPTION"}), - ("test_defaults_to_empty_string", None, ""), + ("test_defaults_to_empty_dict", None, {}), ("test_interpolated_keys", {"{{ stream_state['date'] }} - {{ next_page_token['offset'] }}": "ABC"}, {"2021-01-01 - 12345": "ABC"}), ], ) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py index 625f9c05cd4a..336248546546 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_interpolated_request_input_provider.py @@ -4,30 +4,9 @@ import pytest as pytest from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.interpolated_request_input_provider import InterpolatedRequestInputProvider -@pytest.mark.parametrize( - "test_name, input_request_data, expected_request_data", - [ - ("test_static_string_data", "a_static_value", "a_static_value"), - ("test_string_depends_on_state", "key={{ stream_state['state_key'] }}", "key=state_value"), - ("test_string_depends_on_next_page_token", "{{ next_page_token['token_key'] }} + ultra", "token_value + ultra"), - ], -) -def test_interpolated_string_request_input_provider(test_name, input_request_data, expected_request_data): - config = {"config_key": "value_of_config"} - stream_state = {"state_key": "state_value"} - next_page_token = {"token_key": "token_value"} - - provider = InterpolatedRequestInputProvider(config=config, request_inputs=input_request_data) - actual_request_data = provider.request_inputs(stream_state=stream_state, next_page_token=next_page_token) - - assert isinstance(provider._interpolator, InterpolatedString) - assert actual_request_data == expected_request_data - - @pytest.mark.parametrize( "test_name, input_request_data, expected_request_data", [ diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 69e78d120217..b759d2c9ec3c 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -10,6 +10,7 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever @@ -24,6 +25,7 @@ def test_simple_retriever(): paginator = MagicMock() next_page_token = {"cursor": "cursor_value"} + paginator.path.return_value = None paginator.next_page_token.return_value = next_page_token record_selector = MagicMock() @@ -166,3 +168,99 @@ def test_backoff_time(test_name, response_action, retry_in, expected_backoff_tim assert False except ValueError: pass + + +@pytest.mark.parametrize( + "test_name, paginator_mapping, expected_mapping", + [ + ("test_only_base_headers", {}, {"key": "value"}), + ("test_header_from_pagination", {"offset": 1000}, {"key": "value", "offset": 1000}), + ("test_duplicate_header", {"key": 1000}, None), + ], +) +def test_get_request_options_from_pagination(test_name, paginator_mapping, expected_mapping): + paginator = MagicMock() + paginator.request_headers.return_value = paginator_mapping + paginator.request_params.return_value = paginator_mapping + paginator.request_body_data.return_value = paginator_mapping + paginator.request_body_json.return_value = paginator_mapping + requester = MagicMock() + + base_mapping = {"key": "value"} + requester.request_headers.return_value = base_mapping + requester.request_params.return_value = base_mapping + requester.request_body_data.return_value = base_mapping + requester.request_body_json.return_value = base_mapping + + record_selector = MagicMock() + retriever = SimpleRetriever("stream_name", primary_key, requester=requester, record_selector=record_selector, paginator=paginator) + + request_option_type_to_method = { + RequestOptionType.header: retriever.request_headers, + RequestOptionType.request_parameter: retriever.request_params, + RequestOptionType.body_data: retriever.request_body_data, + RequestOptionType.body_json: retriever.request_body_json, + } + + for _, method in request_option_type_to_method.items(): + if expected_mapping: + actual_mapping = method(None, None, None) + assert expected_mapping == actual_mapping + else: + try: + method(None, None, None) + assert False + except ValueError: + pass + + +@pytest.mark.parametrize( + "test_name, requester_body_data, paginator_body_data, expected_body_data", + [ + ("test_only_requester_mapping", {"key": "value"}, {}, {"key": "value"}), + ("test_only_requester_string", "key=value", {}, "key=value"), + ("test_requester_mapping_and_paginator_no_duplicate", {"key": "value"}, {"offset": 1000}, {"key": "value", "offset": 1000}), + ("test_requester_mapping_and_paginator_with_duplicate", {"key": "value"}, {"key": 1000}, None), + ("test_requester_string_and_paginator", "key=value", {"offset": 1000}, None), + ], +) +def test_request_body_data(test_name, requester_body_data, paginator_body_data, expected_body_data): + paginator = MagicMock() + paginator.request_body_data.return_value = paginator_body_data + requester = MagicMock() + + requester.request_body_data.return_value = requester_body_data + + record_selector = MagicMock() + retriever = SimpleRetriever("stream_name", primary_key, requester=requester, record_selector=record_selector, paginator=paginator) + + if expected_body_data: + actual_body_data = retriever.request_body_data(None, None, None) + assert expected_body_data == actual_body_data + else: + try: + retriever.request_body_data(None, None, None) + assert False + except ValueError: + pass + + +@pytest.mark.parametrize( + "test_name, requester_path, paginator_path, expected_path", + [ + ("test_path_from_requester", "/v1/path", None, "/v1/path"), + ("test_path_from_paginator", "/v1/path/", "/v2/paginator", "/v2/paginator"), + ], +) +def test_path(test_name, requester_path, paginator_path, expected_path): + paginator = MagicMock() + paginator.path.return_value = paginator_path + requester = MagicMock() + + requester.get_path.return_value = requester_path + + record_selector = MagicMock() + retriever = SimpleRetriever("stream_name", primary_key, requester=requester, record_selector=record_selector, paginator=paginator) + + actual_path = retriever.path(stream_state=None, stream_slice=None, next_page_token=None) + assert expected_path == actual_path diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index 4ecaa68d385b..a0dab7405294 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -16,7 +16,8 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester -from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator +from airbyte_cdk.sources.declarative.requesters.paginators.limit_paginator import LimitPaginator +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) @@ -149,9 +150,17 @@ def test_full_config(): class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter condition: "{{ record['id'] > stream_state['id'] }}" metadata_paginator: - class_name: "airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator.NextPageUrlPaginator" - next_page_token_template: - "next_page_url": "{{ decoded_response['_metadata']['next'] }}" + type: "LimitPaginator" + page_size: 10 + limit_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ decoded_response._metadata.next }}" + url_base: "https://api.sendgrid.com/v3/" next_page_url_from_token_partial: class_name: "airbyte_cdk.sources.declarative.interpolation.interpolated_string.InterpolatedString" string: "{{ next_page_token['next_page_url'] }}" @@ -324,9 +333,16 @@ def test_config_with_defaults(): file_path: "./source_sendgrid/schemas/{{name}}.yaml" retriever: paginator: - type: "NextPageUrlPaginator" - next_page_token_template: - next_page_token: "{{ decoded_response.metadata.next}}" + type: "LimitPaginator" + page_size: 10 + limit_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ decoded_response._metadata.next }}" requester: path: "/v3/marketing/lists" authenticator: @@ -353,11 +369,35 @@ def test_config_with_defaults(): assert stream._retriever._requester._authenticator._tokens == ["verysecrettoken"] assert stream._retriever._record_selector._extractor._transform == ".result[]" assert stream._schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.yaml" - assert isinstance(stream._retriever._paginator, NextPageUrlPaginator) - assert stream._retriever._paginator._url_base == "https://api.sendgrid.com" - assert stream._retriever._paginator._interpolated_paginator._next_page_token_template._mapping == { - "next_page_token": "{{ decoded_response.metadata.next}}" - } + assert isinstance(stream._retriever._paginator, LimitPaginator) + + assert stream._retriever._paginator._url_base._string == "https://api.sendgrid.com" + assert stream._retriever._paginator._page_size == 10 + + +def test_create_limit_paginator(): + content = """ + paginator: + type: "LimitPaginator" + page_size: 10 + url_base: "https://airbyte.io" + limit_option: + inject_into: request_parameter + field_name: page_size + page_token_option: + inject_into: path + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ decoded_response._metadata.next }}" + """ + config = parser.parse(content) + + paginator_config = config["paginator"] + paginator = factory.create_component(paginator_config, input_config)() + assert isinstance(paginator, LimitPaginator) + page_token_option = paginator._page_token_option + assert isinstance(page_token_option, RequestOption) + assert page_token_option.inject_into == RequestOptionType.path class TestCreateTransformations: