Skip to content

Commit

Permalink
low-code connectors: Set slicer's request options (#15283)
Browse files Browse the repository at this point in the history
* requester is a request options provider

* get request options from slicer

* remove prints

* share interface

* actual fix with test

* small fix

* missing tests

* missing *

* simplify intersection logic

* bump cdk version
  • Loading branch information
girarda authored Aug 4, 2022
1 parent 017a092 commit 6e59cfd
Show file tree
Hide file tree
Showing 20 changed files with 487 additions and 267 deletions.
4 changes: 4 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.1.70
- Bugfix: DatetimeStreamSlicer cast interpolated result to string before converting to datetime
- Bugfix: Set stream slicer's request options in SimpleRetriever

## 0.1.69
- AbstractSource emits a state message when reading incremental even if there were no stream slices to process.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,55 @@ def should_retry(self, response: requests.Response) -> ResponseStatus:
return self._error_handler.should_retry(response)

def request_params(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return self._request_options_provider.request_params(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_params(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

def request_headers(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._request_options_provider.request_headers(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_headers(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

def request_body_data(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping, str]]:
return self._request_options_provider.request_body_data(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_body_data(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

def request_body_json(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
return self._request_options_provider.request_body_json(stream_state, stream_slice, next_page_token)
return self._request_options_provider.request_body_json(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

def request_kwargs(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# todo: there are a few integrations that override the request_kwargs() method, but the use case for why kwargs over existing
# constructs is a little unclear. We may revisit this, but for now lets leave it out of the DSL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState


class LimitPaginator(Paginator):
Expand Down Expand Up @@ -117,22 +117,42 @@ def path(self):
else:
return None

def request_params(self) -> Mapping[str, Any]:
def request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.request_parameter)

def request_headers(self) -> Mapping[str, str]:
def request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, str]:
return self._get_request_options(RequestOptionType.header)

def request_body_data(self) -> Mapping[str, Any]:
def request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_data)

def request_body_json(self) -> Mapping[str, Any]:
def request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)

def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}

def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
options = {}
if self._page_token_option.inject_into == option_type:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import requests
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState


class NoPagination(Paginator):
Expand All @@ -16,20 +17,40 @@ class NoPagination(Paginator):
def path(self) -> Optional[str]:
return None

def request_params(self) -> Mapping[str, Any]:
def request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def request_headers(self) -> Mapping[str, str]:
def request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, str]:
return {}

def request_body_data(self) -> Union[Mapping[str, Any], str]:
def request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
return {}

def request_body_json(self) -> Mapping[str, Any]:
return {}

def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
def request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,39 +38,3 @@ def path(self) -> Optional[str]:
:return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
"""
pass

@abstractmethod
def request_params(self) -> Mapping[str, Any]:
"""
Specifies the query parameters that should be set on an outgoing HTTP request to fetch the next page of records.
:return: the request parameters to set to fetch the next page
"""
pass

@abstractmethod
def request_headers(self) -> Mapping[str, str]:
"""
Specifies the request headers that should be set on an outgoing HTTP request to fetch the next page of records.
:return: the request headers to set to fetch the next page
"""
pass

@abstractmethod
def request_body_data(self) -> Mapping[str, Any]:
"""
Specifies the body data that should be set on an outgoing HTTP request to fetch the next page of records.
:return: the request body data to set to fetch the next page
"""
pass

@abstractmethod
def request_body_json(self) -> Mapping[str, Any]:
"""
Specifies the json content that should be set on an outgoing HTTP request to fetch the next page of records.
:return: the request body to set (as a json object) to fetch the next page
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,39 @@ def __init__(
self._body_json_interpolator = InterpolatedRequestInputProvider(config=config, request_inputs=request_body_json)

def request_params(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
if isinstance(interpolated_value, dict):
return interpolated_value
return {}

def request_headers(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token)

def request_body_data(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping, str]]:
return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token)

def request_body_json(
self,
stream_state: StreamState,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ class RequestOptionsProvider(ABC):
"""

@abstractmethod
def request_params(self, **kwargs) -> MutableMapping[str, Any]:
def request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
"""
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
Expand All @@ -30,13 +36,21 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]:

@abstractmethod
def request_headers(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method."""

@abstractmethod
def request_body_data(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping, str]]:
"""
Specifies how to populate the body of the request with a non-JSON payload.
Expand All @@ -50,7 +64,11 @@ def request_body_data(

@abstractmethod
def request_body_json(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
"""
Specifies how to populate the body of the request with a JSON payload.
Expand Down
Loading

0 comments on commit 6e59cfd

Please sign in to comment.