diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 56264df727a7..13e992aea744 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.1.70 +- Bugfix: DatetimeStreamSlicer cast interpolated result to string before converting to datetime +- Bugfix: Set stream slicer's request options in SimpleRetriever + ## 0.1.69 - AbstractSource emits a state message when reading incremental even if there were no stream slices to process. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py index 287428c97cb9..651b58186e5d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -88,27 +88,55 @@ def should_retry(self, response: requests.Response) -> ResponseStatus: return self._error_handler.should_retry(response) def request_params( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: - return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token) + return self._request_options_provider.request_params( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) def request_headers( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._request_options_provider.request_headers(stream_state, stream_slice, next_page_token) + return self._request_options_provider.request_headers( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) def request_body_data( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Union[Mapping, str]]: - return self._request_options_provider.request_body_data(stream_state, stream_slice, next_page_token) + return self._request_options_provider.request_body_data( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) def request_body_json( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: - return self._request_options_provider.request_body_json(stream_state, stream_slice, next_page_token) + return self._request_options_provider.request_body_json( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) def request_kwargs( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: # todo: there are a few integrations that override the request_kwargs() method, but the use case for why kwargs over existing # constructs is a little unclear. We may revisit this, but for now lets leave it out of the DSL diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py index c7ba91fec3ad..6d34cdd5a32d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py @@ -11,7 +11,7 @@ from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType -from airbyte_cdk.sources.declarative.types import Config +from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState class LimitPaginator(Paginator): @@ -117,22 +117,42 @@ def path(self): else: return None - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.request_parameter) - def request_headers(self) -> Mapping[str, str]: + def request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, str]: return self._get_request_options(RequestOptionType.header) - def request_body_data(self) -> Mapping[str, Any]: + def request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.body_data) - def request_body_json(self) -> Mapping[str, Any]: + def request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.body_json) - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} - def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: options = {} if self._page_token_option.inject_into == option_type: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index e1592e14ef31..8877c829a7a2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -6,6 +6,7 @@ import requests from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState class NoPagination(Paginator): @@ -16,20 +17,40 @@ class NoPagination(Paginator): def path(self) -> Optional[str]: return None - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} - def request_headers(self) -> Mapping[str, str]: + def request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, str]: return {} - def request_body_data(self) -> Union[Mapping[str, Any], str]: + def request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Union[Mapping[str, Any], str]: return {} - def request_body_json(self) -> Mapping[str, Any]: - return {} - - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs + def request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index d26581ff985a..084d3b6a5e88 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -38,39 +38,3 @@ def path(self) -> Optional[str]: :return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token """ pass - - @abstractmethod - def request_params(self) -> Mapping[str, Any]: - """ - Specifies the query parameters that should be set on an outgoing HTTP request to fetch the next page of records. - - :return: the request parameters to set to fetch the next page - """ - pass - - @abstractmethod - def request_headers(self) -> Mapping[str, str]: - """ - Specifies the request headers that should be set on an outgoing HTTP request to fetch the next page of records. - - :return: the request headers to set to fetch the next page - """ - pass - - @abstractmethod - def request_body_data(self) -> Mapping[str, Any]: - """ - Specifies the body data that should be set on an outgoing HTTP request to fetch the next page of records. - - :return: the request body data to set to fetch the next page - """ - pass - - @abstractmethod - def request_body_json(self) -> Mapping[str, Any]: - """ - Specifies the json content that should be set on an outgoing HTTP request to fetch the next page of records. - - :return: the request body to set (as a json object) to fetch the next page - """ - pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index c8470ac0c726..793594eb4c11 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -48,7 +48,11 @@ def __init__( self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json) def request_params( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token) if isinstance(interpolated_value, dict): @@ -56,18 +60,27 @@ def request_params( return {} def request_headers( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token) def request_body_data( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Union[Mapping, str]]: return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token) def request_body_json( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py index b9936c1045b8..425107afe29a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py @@ -20,7 +20,13 @@ class RequestOptionsProvider(ABC): """ @abstractmethod - def request_params(self, **kwargs) -> MutableMapping[str, Any]: + def request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: """ Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. @@ -30,13 +36,21 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]: @abstractmethod def request_headers( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.""" @abstractmethod def request_body_data( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Union[Mapping, str]]: """ Specifies how to populate the body of the request with a non-JSON payload. @@ -50,7 +64,11 @@ def request_body_data( @abstractmethod def request_body_json( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: """ Specifies how to populate the body of the request with a JSON payload. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py index bb80a7cb8b41..8b7d0e045043 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py @@ -2,12 +2,13 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from enum import Enum from typing import Any, Mapping, MutableMapping, Optional import requests from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus +from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState from requests.auth import AuthBase @@ -21,7 +22,7 @@ class HttpMethod(Enum): POST = "POST" -class Requester(ABC): +class Requester(RequestOptionsProvider): @abstractmethod def get_authenticator(self) -> AuthBase: """ @@ -56,7 +57,8 @@ def get_method(self) -> HttpMethod: @abstractmethod def request_params( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: @@ -80,7 +82,11 @@ def should_retry(self, response: requests.Response) -> ResponseStatus: @abstractmethod def request_headers( - self, stream_state: StreamSlice, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """ Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. @@ -89,7 +95,8 @@ def request_headers( @abstractmethod def request_body_data( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: @@ -106,7 +113,8 @@ def request_body_data( @abstractmethod def request_body_json( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: @@ -119,7 +127,8 @@ def request_body_json( @abstractmethod def request_kwargs( self, - stream_state: StreamState, + *, + stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 58a4bf13cfc9..b39a01d14a72 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -109,18 +109,13 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: assert should_retry.action == ResponseAction.RETRY return should_retry.retry_in - def request_headers( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None - ) -> Mapping[str, Any]: - """ - Specifies request headers. - Authentication headers will overwrite any overlapping headers returned from this method. - """ - # Warning: use self.state instead of the stream_state passed as argument! - return self._get_request_options(stream_slice, next_page_token, self._requester.request_headers, self._paginator.request_headers) - def _get_request_options( - self, stream_slice: Optional[StreamSlice], next_page_token: Optional[Mapping[str, Any]], requester_method, paginator_method + self, + stream_slice: Optional[StreamSlice], + next_page_token: Optional[Mapping[str, Any]], + requester_method, + paginator_method, + stream_slicer_method, ): """ Get the request_option from the requester and from the paginator @@ -133,11 +128,54 @@ def _get_request_options( :return: """ requester_mapping = requester_method(self.state, stream_slice, next_page_token) - paginator_mapping = paginator_method() - keys_intersection = set(requester_mapping.keys()) & set(paginator_mapping.keys()) - if keys_intersection: - raise ValueError(f"Duplicate keys found: {keys_intersection}") - return {**requester_mapping, **paginator_mapping} + requester_mapping_keys = set(requester_mapping.keys()) + paginator_mapping = paginator_method(self.state, stream_slice, next_page_token) + paginator_mapping_keys = set(paginator_mapping.keys()) + stream_slicer_mapping = stream_slicer_method(stream_slice) + stream_slicer_mapping_keys = set(stream_slicer_mapping.keys()) + + intersection = ( + (requester_mapping_keys & paginator_mapping_keys) + | (requester_mapping_keys & stream_slicer_mapping_keys) + | (paginator_mapping_keys & stream_slicer_mapping_keys) + ) + if intersection: + raise ValueError(f"Duplicate keys found: {intersection}") + return {**requester_mapping, **paginator_mapping, **stream_slicer_mapping} + + def request_headers( + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + ) -> Mapping[str, Any]: + """ + Specifies request headers. + Authentication headers will overwrite any overlapping headers returned from this method. + """ + return self._get_request_options( + stream_slice, + next_page_token, + self._requester.request_headers, + self._paginator.request_headers, + self._stream_slicer.request_headers, + ) + + def request_params( + self, + stream_state: StreamSlice, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> MutableMapping[str, Any]: + """ + Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. + + E.g: you might want to define query parameters for paging if next_page_token is not None. + """ + return self._get_request_options( + stream_slice, + next_page_token, + self._requester.request_params, + self._paginator.request_params, + self._stream_slicer.request_params, + ) def request_body_data( self, @@ -155,7 +193,9 @@ def request_body_data( At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. """ # Warning: use self.state instead of the stream_state passed as argument! - base_body_data = self._requester.request_body_data(self.state, stream_slice, next_page_token) + base_body_data = self._requester.request_body_data( + stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token + ) if isinstance(base_body_data, str): paginator_body_data = self._paginator.request_body_data() if paginator_body_data: @@ -165,7 +205,11 @@ def request_body_data( else: return base_body_data return self._get_request_options( - stream_slice, next_page_token, self._requester.request_body_data, self._paginator.request_body_data + stream_slice, + next_page_token, + self._requester.request_body_data, + self._paginator.request_body_data, + self._stream_slicer.request_body_data, ) def request_body_json( @@ -181,7 +225,11 @@ def request_body_json( """ # Warning: use self.state instead of the stream_state passed as argument! return self._get_request_options( - stream_slice, next_page_token, self._requester.request_body_json, self._paginator.request_body_json + stream_slice, + next_page_token, + self._requester.request_body_json, + self._paginator.request_body_json, + self._stream_slicer.request_body_json, ) def request_kwargs( @@ -196,7 +244,7 @@ def request_kwargs( this method. Note that these options do not conflict with request-level options such as headers, request params, etc.. """ # Warning: use self.state instead of the stream_state passed as argument! - return self._requester.request_kwargs(self.state, stream_slice, next_page_token) + return self._requester.request_kwargs(stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token) def path( self, @@ -220,20 +268,6 @@ def path( else: return self._requester.get_path(stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token) - def request_params( - self, - stream_state: StreamSlice, - stream_slice: Optional[StreamSlice] = None, - next_page_token: Optional[Mapping[str, Any]] = None, - ) -> MutableMapping[str, Any]: - """ - Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. - - E.g: you might want to define query parameters for paging if next_page_token is not None. - """ - # Warning: use self.state instead of the stream_state passed as argument! - return self._get_request_options(stream_slice, next_page_token, self._requester.request_params, self._paginator.request_params) - @property def cache_filename(self) -> str: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py index dc0bc1b6b511..9c52c07abc87 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py @@ -8,6 +8,7 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState class CartesianProductStreamSlicer(StreamSlicer): @@ -37,21 +38,41 @@ def update_cursor(self, stream_slice: Mapping[str, Any], last_record: Optional[M for slicer in self._stream_slicers: slicer.update_cursor(stream_slice, last_record) - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return dict(ChainMap(*[s.request_params() for s in self._stream_slicers])) - def request_headers(self) -> Mapping[str, Any]: - return dict(ChainMap(*[s.request_headers() for s in self._stream_slicers])) + def request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return dict(ChainMap(*[s.request_headers(stream_state, stream_slice, next_page_token) for s in self._stream_slicers])) - def request_body_data(self) -> Mapping[str, Any]: - return dict(ChainMap(*[s.request_body_data() for s in self._stream_slicers])) + def request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return dict(ChainMap(*[s.request_body_data(stream_state, stream_slice, next_page_token) for s in self._stream_slicers])) - def request_body_json(self) -> Optional[Mapping]: - return dict(ChainMap(*[s.request_body_json() for s in self._stream_slicers])) - - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} + def request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Optional[Mapping]: + return dict(ChainMap(*[s.request_body_json(stream_state, stream_slice, next_page_token) for s in self._stream_slicers])) def get_stream_state(self) -> Mapping[str, Any]: return dict(ChainMap(*[slicer.get_stream_state() for slicer in self._stream_slicers])) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index a9190cc1fe59..772a8fc51e25 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -75,11 +75,12 @@ def __init__( self._cursor_field = InterpolatedString.create(cursor_field, options=options) self._start_time_option = start_time_option self._end_time_option = end_time_option - self._stream_slice_field_start = InterpolatedString.create(stream_state_field_start or "start_date", options=options) - self._stream_slice_field_end = InterpolatedString.create(stream_state_field_end or "end_date", options=options) + self._stream_slice_field_start = InterpolatedString.create(stream_state_field_start or "start_time", options=options) + self._stream_slice_field_end = InterpolatedString.create(stream_state_field_end or "end_time", options=options) self._cursor = None # tracks current datetime self._cursor_end = None # tracks end of current stream slice self._lookback_window = lookback_window + self._options = options # If datetime format is not specified then start/end datetime should inherit it from the stream slicer if not self._start_datetime.datetime_format: @@ -205,27 +206,52 @@ def _parse_timedelta(cls, time_str): time_params = {name: float(param) for name, param in parts.groupdict().items() if param} return datetime.timedelta(**time_params) - def request_params(self) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.request_parameter) - - def request_headers(self) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.header) - - def request_body_data(self) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.body_data) - - def request_body_json(self) -> Mapping[str, Any]: - return self._get_request_options(RequestOptionType.body_json) + def request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.request_parameter, stream_slice) + + def request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.header, stream_slice) + + def request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.body_data, stream_slice) + + def request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.body_json, stream_slice) def request_kwargs(self) -> Mapping[str, Any]: # Never update kwargs return {} - def _get_request_options(self, option_type): + def _get_request_options(self, option_type: RequestOptionType, stream_slice: StreamSlice): options = {} if self._start_time_option and self._start_time_option.inject_into == option_type: - if self._cursor: - options[self._start_time_option.field_name] = self._cursor + options[self._start_time_option.field_name] = stream_slice.get( + self._stream_slice_field_start.eval(self._config, **self._options) + ) if self._end_time_option and self._end_time_option.inject_into == option_type: - options[self._end_time_option.field_name] = self._cursor_end + options[self._end_time_option.field_name] = stream_slice.get(self._stream_slice_field_end.eval(self._config, **self._options)) return options diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py index 719612bfa94b..2dbea6841de8 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py @@ -53,22 +53,38 @@ def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] def get_stream_state(self) -> StreamState: return {self._cursor_field.eval(self._config): self._cursor} if self._cursor else {} - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.request_parameter) - def request_headers(self) -> Mapping[str, Any]: + def request_headers( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.header) - def request_body_data(self) -> Mapping[str, Any]: + def request_body_data( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.body_data) - def request_body_json(self) -> Mapping[str, Any]: + def request_body_json( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.body_json) - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} - def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: return [{self._cursor_field.eval(self._config): slice_value} for slice_value in self._slice_values] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py index a7571a6afb51..161cdae970ff 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py @@ -12,27 +12,46 @@ class SingleSlice(StreamSlicer): """Stream slicer returning only a single stream slice""" + def __init__(self, **options): + pass + def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): pass def get_stream_state(self) -> StreamState: return {} - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} - def request_headers(self) -> Mapping[str, Any]: + def request_headers( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} - def request_body_data(self) -> Mapping[str, Any]: + def request_body_data( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} - def request_body_json(self) -> Mapping[str, Any]: + def request_body_json( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return {} def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[StreamSlice]: return [dict()] - - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py index 17a65462f8f1..b387a7027509 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py @@ -54,22 +54,38 @@ def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] cursor.update({parent_stream_config.stream_slice_field: slice_value}) self._cursor = cursor - def request_params(self) -> Mapping[str, Any]: + def request_params( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.request_parameter) - def request_headers(self) -> Mapping[str, Any]: + def request_headers( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.header) - def request_body_data(self) -> Mapping[str, Any]: + def request_body_data( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: return self._get_request_option(RequestOptionType.body_data) - def request_body_json(self) -> Optional[Mapping]: + def request_body_json( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Optional[Mapping]: return self._get_request_option(RequestOptionType.body_json) - def request_kwargs(self) -> Mapping[str, Any]: - # Never update kwargs - return {} - def _get_request_option(self, option_type: RequestOptionType): params = {} for parent_config in self._parent_stream_configs: diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 87467b9c2f62..d314389c93d5 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.69", + version="0.1.70", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py index 7351baf8b1a7..65f458fecaf3 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py @@ -32,7 +32,7 @@ def test_interpolated_request_params(test_name, input_request_params, expected_request_params): provider = InterpolatedRequestOptionsProvider(config=config, request_parameters=input_request_params) - actual_request_params = provider.request_params(state, stream_slice, next_page_token) + actual_request_params = provider.request_params(stream_state=state, stream_slice=stream_slice, next_page_token=next_page_token) assert actual_request_params == expected_request_params @@ -56,7 +56,7 @@ def test_interpolated_request_params(test_name, input_request_params, expected_r def test_interpolated_request_json(test_name, input_request_json, expected_request_json): provider = InterpolatedRequestOptionsProvider(config=config, request_body_json=input_request_json) - actual_request_json = provider.request_body_json(state, stream_slice, next_page_token) + actual_request_json = provider.request_body_json(stream_state=state, stream_slice=stream_slice, next_page_token=next_page_token) assert actual_request_json == expected_request_json @@ -74,7 +74,7 @@ def test_interpolated_request_json(test_name, input_request_json, expected_reque def test_interpolated_request_data(test_name, input_request_data, expected_request_data): provider = InterpolatedRequestOptionsProvider(config=config, request_body_data=input_request_data) - actual_request_data = provider.request_body_data(state, stream_slice, next_page_token) + actual_request_data = provider.request_body_data(stream_state=state, stream_slice=stream_slice, next_page_token=next_page_token) assert actual_request_data == expected_request_data diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index ca994f8a3d6a..aa3b1ee215e6 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -168,29 +168,40 @@ def test_backoff_time(test_name, response_action, retry_in, expected_backoff_tim @pytest.mark.parametrize( - "test_name, paginator_mapping, expected_mapping", + "test_name, paginator_mapping, stream_slicer_mapping, expected_mapping", [ - ("test_only_base_headers", {}, {"key": "value"}), - ("test_header_from_pagination", {"offset": 1000}, {"key": "value", "offset": 1000}), - ("test_duplicate_header", {"key": 1000}, None), + ("test_only_base_headers", {}, {}, {"key": "value"}), + ("test_header_from_pagination", {"offset": 1000}, {}, {"key": "value", "offset": 1000}), + ("test_header_from_stream_slicer", {}, {"slice": "slice_value"}, {"key": "value", "slice": "slice_value"}), + ("test_duplicate_header_slicer", {}, {"key": "slice_value"}, None), + ("test_duplicate_header_slicer_paginator", {"k": "v"}, {"k": "slice_value"}, None), + ("test_duplicate_header_paginator", {"key": 1000}, {}, None), ], ) -def test_get_request_options_from_pagination(test_name, paginator_mapping, expected_mapping): +def test_get_request_options_from_pagination(test_name, paginator_mapping, stream_slicer_mapping, expected_mapping): paginator = MagicMock() paginator.request_headers.return_value = paginator_mapping paginator.request_params.return_value = paginator_mapping paginator.request_body_data.return_value = paginator_mapping paginator.request_body_json.return_value = paginator_mapping - requester = MagicMock() + + stream_slicer = MagicMock() + stream_slicer.request_headers.return_value = stream_slicer_mapping + stream_slicer.request_params.return_value = stream_slicer_mapping + stream_slicer.request_body_data.return_value = stream_slicer_mapping + stream_slicer.request_body_json.return_value = stream_slicer_mapping base_mapping = {"key": "value"} + requester = MagicMock() requester.request_headers.return_value = base_mapping requester.request_params.return_value = base_mapping requester.request_body_data.return_value = base_mapping requester.request_body_json.return_value = base_mapping record_selector = MagicMock() - retriever = SimpleRetriever("stream_name", primary_key, requester=requester, record_selector=record_selector, paginator=paginator) + retriever = SimpleRetriever( + "stream_name", primary_key, requester=requester, record_selector=record_selector, paginator=paginator, stream_slicer=stream_slicer + ) request_option_type_to_method = { RequestOptionType.header: retriever.request_headers, diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py index e38ba23f7226..28563fbdaa9d 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py @@ -49,15 +49,15 @@ ), ], [ - {"owner_resource": "customer", "start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"owner_resource": "customer", "start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"owner_resource": "customer", "start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"owner_resource": "store", "start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"owner_resource": "store", "start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"owner_resource": "store", "start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"owner_resource": "subscription", "start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"owner_resource": "subscription", "start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"owner_resource": "subscription", "start_date": "2021-01-03", "end_date": "2021-01-03"}, + {"owner_resource": "customer", "start_time": "2021-01-01", "end_time": "2021-01-01"}, + {"owner_resource": "customer", "start_time": "2021-01-02", "end_time": "2021-01-02"}, + {"owner_resource": "customer", "start_time": "2021-01-03", "end_time": "2021-01-03"}, + {"owner_resource": "store", "start_time": "2021-01-01", "end_time": "2021-01-01"}, + {"owner_resource": "store", "start_time": "2021-01-02", "end_time": "2021-01-02"}, + {"owner_resource": "store", "start_time": "2021-01-03", "end_time": "2021-01-03"}, + {"owner_resource": "subscription", "start_time": "2021-01-01", "end_time": "2021-01-01"}, + {"owner_resource": "subscription", "start_time": "2021-01-02", "end_time": "2021-01-02"}, + {"owner_resource": "subscription", "start_time": "2021-01-03", "end_time": "2021-01-03"}, ], ), ], diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py index c34f4ada975d..a8c8cd287462 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py @@ -43,16 +43,16 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, - {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, - {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"}, + {"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -65,11 +65,11 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -82,12 +82,12 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -100,11 +100,11 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( - "test_end_date_greater_than_now", + "test_end_time_greater_than_now", None, MinMaxDatetime("2021-12-28T00:00:00.000000+0000"), MinMaxDatetime(f"{(FAKE_NOW + datetime.timedelta(days=1)).strftime(datetime_format)}"), @@ -113,15 +113,15 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-12-28T00:00:00.000000+0000", "end_date": "2021-12-28T00:00:00.000000+0000"}, - {"start_date": "2021-12-29T00:00:00.000000+0000", "end_date": "2021-12-29T00:00:00.000000+0000"}, - {"start_date": "2021-12-30T00:00:00.000000+0000", "end_date": "2021-12-30T00:00:00.000000+0000"}, - {"start_date": "2021-12-31T00:00:00.000000+0000", "end_date": "2021-12-31T00:00:00.000000+0000"}, - {"start_date": "2022-01-01T00:00:00.000000+0000", "end_date": "2022-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-12-28T00:00:00.000000+0000", "end_time": "2021-12-28T00:00:00.000000+0000"}, + {"start_time": "2021-12-29T00:00:00.000000+0000", "end_time": "2021-12-29T00:00:00.000000+0000"}, + {"start_time": "2021-12-30T00:00:00.000000+0000", "end_time": "2021-12-30T00:00:00.000000+0000"}, + {"start_time": "2021-12-31T00:00:00.000000+0000", "end_time": "2021-12-31T00:00:00.000000+0000"}, + {"start_time": "2022-01-01T00:00:00.000000+0000", "end_time": "2022-01-01T00:00:00.000000+0000"}, ], ), ( - "test_start_date_greater_than_end_date", + "test_start_date_greater_than_end_time", None, MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), MinMaxDatetime("2021-01-05T00:00:00.000000+0000"), @@ -130,7 +130,7 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, ], ), ( @@ -143,12 +143,12 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -161,9 +161,9 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -176,12 +176,12 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ( @@ -194,11 +194,11 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, - {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, - {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"}, + {"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, ], ), ( @@ -211,11 +211,11 @@ def mock_datetime_now(monkeypatch): None, "%Y-%m-%d", [ - {"start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"start_date": "2021-01-04", "end_date": "2021-01-04"}, - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, + {"start_time": "2021-01-01", "end_time": "2021-01-01"}, + {"start_time": "2021-01-02", "end_time": "2021-01-02"}, + {"start_time": "2021-01-03", "end_time": "2021-01-03"}, + {"start_time": "2021-01-04", "end_time": "2021-01-04"}, + {"start_time": "2021-01-05", "end_time": "2021-01-05"}, ], ), ( @@ -228,14 +228,14 @@ def mock_datetime_now(monkeypatch): "3d", datetime_format, [ - {"start_date": "2020-12-29T00:00:00.000000+0000", "end_date": "2020-12-29T00:00:00.000000+0000"}, - {"start_date": "2020-12-30T00:00:00.000000+0000", "end_date": "2020-12-30T00:00:00.000000+0000"}, - {"start_date": "2020-12-31T00:00:00.000000+0000", "end_date": "2020-12-31T00:00:00.000000+0000"}, - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, - {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, - {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2020-12-29T00:00:00.000000+0000", "end_time": "2020-12-29T00:00:00.000000+0000"}, + {"start_time": "2020-12-30T00:00:00.000000+0000", "end_time": "2020-12-30T00:00:00.000000+0000"}, + {"start_time": "2020-12-31T00:00:00.000000+0000", "end_time": "2020-12-31T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"}, + {"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, ], ), ( @@ -248,11 +248,11 @@ def mock_datetime_now(monkeypatch): "{{ config['does_not_exist'] }}", datetime_format, [ - {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, - {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, - {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, - {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, - {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-01T00:00:00.000000+0000"}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "end_time": "2021-01-02T00:00:00.000000+0000"}, + {"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"}, + {"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"}, ], ), ( @@ -265,11 +265,11 @@ def mock_datetime_now(monkeypatch): None, datetime_format, [ - {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, - {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, - {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, - {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, - {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + {"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"}, + {"start_time": "2021-01-07T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-08T00:00:00.000000+0000"}, + {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-09T00:00:00.000000+0000"}, + {"start_time": "2021-01-10T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), ], @@ -357,7 +357,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex "test_start_time_passed_by_req_param", RequestOptionType.request_parameter, "start_time", - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, {}, {}, {}, @@ -367,7 +367,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex RequestOptionType.header, "start_time", {}, - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, {}, {}, ), @@ -377,7 +377,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex "start_time", {}, {}, - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, {}, ), ( @@ -387,7 +387,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex {}, {}, {}, - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, ), ( "test_start_time_inject_into_path", @@ -396,7 +396,7 @@ def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, ex {}, {}, {}, - {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {"start_time": "2021-01-01T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, ), ], ) @@ -433,14 +433,14 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params, end_time_option=end_request_option, config=config, ) - stream_slice = {cursor_field: "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"} + stream_slice = {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"} slicer.update_cursor(stream_slice) - assert expected_req_params == slicer.request_params() - assert expected_headers == slicer.request_headers() - assert expected_body_json == slicer.request_body_json() - assert expected_body_data == slicer.request_body_data() + assert expected_req_params == slicer.request_params(stream_slice=stream_slice) + assert expected_headers == slicer.request_headers(stream_slice=stream_slice) + assert expected_body_json == slicer.request_body_json(stream_slice=stream_slice) + assert expected_body_data == slicer.request_body_data(stream_slice=stream_slice) if __name__ == "__main__": diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py index ba94a1d6a945..ccb8ef40803c 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py @@ -98,7 +98,7 @@ def test_request_option(test_name, request_option, expected_req_params, expected stream_slice = {cursor_field: "customer"} slicer.update_cursor(stream_slice) - assert expected_req_params == slicer.request_params() + assert expected_req_params == slicer.request_params(stream_slice) assert expected_headers == slicer.request_headers() assert expected_body_json == slicer.request_body_json() assert expected_body_data == slicer.request_body_data()