Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Low-Code CDK] Separate request path from RequestOption component #22398

Merged
merged 7 commits into from
Feb 8, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,9 @@ definitions:
page_size_option:
"$ref": "#/definitions/RequestOption"
page_token_option:
"$ref": "#/definitions/RequestOption"
anyOf:
- "$ref": "#/definitions/RequestOption"
- "$ref": "#/definitions/RequestPath"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -908,25 +910,34 @@ definitions:
items:
items:
type: string
RequestPath:
description: A component that specifies where in the request path a component's value should be inserted into.
type: object
required:
- type
properties:
type:
type: string
enum: [RequestPath]
RequestOption:
description: Describes an option to set on a request
description: A component that specifies the key field and where in the request a component's value should be inserted into.
type: object
required:
- type
- field_name
- inject_into
properties:
type:
type: string
enum: [RequestOption]
field_name:
type: string
inject_into:
enum:
- request_parameter
- header
- path
- body_data
- body_json
field_name:
type: string
Schemas:
description: The stream schemas representing the shape of the data emitted by the stream
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,18 +256,21 @@ class RemoveFields(BaseModel):
field_pointers: List[List[str]]


class RequestPath(BaseModel):
type: Literal["RequestPath"]


class InjectInto(Enum):
request_parameter = "request_parameter"
header = "header"
path = "path"
body_data = "body_data"
body_json = "body_json"


class RequestOption(BaseModel):
type: Literal["RequestOption"]
inject_into: InjectInto
field_name: Optional[str] = None
field_name: str


class Schemas(BaseModel):
Expand Down Expand Up @@ -364,7 +367,7 @@ class DefaultPaginator(BaseModel):
pagination_strategy: Union[CursorPagination, CustomPaginationStrategy, OffsetIncrement, PageIncrement]
decoder: Optional[JsonDecoder] = None
page_size_option: Optional[RequestOption] = None
page_token_option: Optional[RequestOption] = None
page_token_option: Optional[Union[RequestPath, RequestOption]] = None
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
# DefaultPaginator
"DefaultPaginator.decoder": "JsonDecoder",
"DefaultPaginator.page_size_option": "RequestOption",
"DefaultPaginator.page_token_option": "RequestOption",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getting rid of a default type because this could be a RequestPath now

# DpathExtractor
"DpathExtractor.decoder": "JsonDecoder",
# HttpRequester
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import RecordSelector as RecordSelectorModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import RemoveFields as RemoveFieldsModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import RequestOption as RequestOptionModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import RequestPath as RequestPathModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SessionTokenAuthenticator as SessionTokenAuthenticatorModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SimpleRetriever as SimpleRetrieverModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SingleSlice as SingleSliceModel
Expand All @@ -87,6 +88,7 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies import CursorPaginationStrategy, OffsetIncrement, PageIncrement
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever, SimpleRetrieverTestReadDecorator
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader, InlineSchemaLoader, JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
Expand Down Expand Up @@ -158,6 +160,7 @@ def _init_mappings(self):
RecordFilterModel: self.create_record_filter,
RecordSelectorModel: self.create_record_selector,
RemoveFieldsModel: self.create_remove_fields,
RequestPathModel: self.create_request_path,
RequestOptionModel: self.create_request_option,
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
SimpleRetrieverModel: self.create_simple_retriever,
Expand Down Expand Up @@ -651,6 +654,10 @@ def create_parent_stream_config(self, model: ParentStreamConfigModel, config: Co
def create_record_filter(model: RecordFilterModel, config: Config, **kwargs) -> RecordFilter:
return RecordFilter(condition=model.condition, config=config, parameters=model.parameters)

@staticmethod
def create_request_path(model: RequestPathModel, config: Config, **kwargs) -> RequestPath:
return RequestPath(parameters={})

@staticmethod
def create_request_option(model: RequestOptionModel, config: Config, **kwargs) -> RequestOption:
inject_into = RequestOptionType(model.inject_into.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState


Expand All @@ -28,10 +29,12 @@ class DefaultPaginator(Paginator):
paginator:
type: "DefaultPaginator"
page_size_option:
type: RequestOption
inject_into: request_parameter
field_name: limit
page_token_option:
option_type: path
type: RequestPath
path: "location"
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
Expand All @@ -45,6 +48,7 @@ class DefaultPaginator(Paginator):
paginator:
type: "DefaultPaginator"
page_size_option:
type: RequestOption
inject_into: header
field_name: page_size
pagination_strategy:
Expand All @@ -62,18 +66,20 @@ class DefaultPaginator(Paginator):
paginator:
type: "DefaultPaginator"
page_size_option:
type: RequestOption
inject_into: request_parameter
field_name: page_size
pagination_strategy:
type: "PageIncrement"
page_size: 5
page_token_option:
type: RequestOption
option_type: "request_parameter"
field_name: "page"
```
Attributes:
page_size_option (Optional[RequestOption]): the request option to set the page size. Cannot be injected in the path.
page_token_option (RequestOption): the request option to set the page token
page_token_option (Optional[RequestPath, RequestOption]): the request option to set the page token
pagination_strategy (PaginationStrategy): Strategy defining how to get the next page token
config (Config): connection config
url_base (Union[InterpolatedString, str]): endpoint's base url
Expand All @@ -87,11 +93,9 @@ class DefaultPaginator(Paginator):
decoder: Decoder = JsonDecoder(parameters={})
_token: Optional[Any] = field(init=False, repr=False, default=None)
page_size_option: Optional[RequestOption] = None
page_token_option: Optional[RequestOption] = None
page_token_option: Optional[Union[RequestPath, RequestOption]] = None

def __post_init__(self, parameters: Mapping[str, Any]):
if self.page_size_option and self.page_size_option.inject_into == RequestOptionType.path:
raise ValueError("page_size_option cannot be set in as path")
if self.page_size_option and not self.pagination_strategy.get_page_size():
raise ValueError("page_size_option cannot be set if the pagination strategy does not have a page_size")
if isinstance(self.url_base, str):
Expand All @@ -105,7 +109,7 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin
return None

def path(self):
if self._token and self.page_token_option and self.page_token_option.inject_into == RequestOptionType.path:
if self._token and self.page_token_option and isinstance(self.page_token_option, RequestPath):
# Replace url base to only return the path
return str(self._token).replace(self.url_base.eval(self.config), "")
else:
Expand Down Expand Up @@ -152,12 +156,16 @@ def reset(self):

def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
options = {}
if self.page_token_option and self.page_token_option.inject_into == option_type:
if option_type != RequestOptionType.path and self._token:
options[self.page_token_option.field_name] = self._token

if (
self.page_token_option
and self._token
and isinstance(self.page_token_option, RequestOption)
and self.page_token_option.inject_into == option_type
):
options[self.page_token_option.field_name] = self._token
if self.page_size_option and self.pagination_strategy.get_page_size() and self.page_size_option.inject_into == option_type:
if option_type != RequestOptionType.path:
options[self.page_size_option.field_name] = self.pagination_strategy.get_page_size()
options[self.page_size_option.field_name] = self.pagination_strategy.get_page_size()
return options


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Mapping, Optional
from typing import Any, Mapping


class RequestOptionType(Enum):
Expand All @@ -14,7 +14,6 @@ class RequestOptionType(Enum):

request_parameter = "request_parameter"
header = "header"
path = "path"
body_data = "body_data"
body_json = "body_json"

Expand All @@ -25,21 +24,10 @@ class RequestOption:
Describes an option to set on a request

Attributes:
field_name (str): Describes the name of the parameter to inject. None if option_type == path. Required otherwise.
inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
field_name (Optional[str]): Describes the name of the parameter to inject. None if option_type == path. Required otherwise.
"""

field_name: str
inject_into: RequestOptionType
parameters: InitVar[Mapping[str, Any]]
field_name: Optional[str] = None

def __post_init__(self, parameters: Mapping[str, Any]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original implementation, only path RequestOption could skip field_name. Now that its been extracted, field_name is required and we can get rid of this entire validation

if self.inject_into == RequestOptionType.path:
if self.field_name is not None:
raise ValueError(f"RequestOption with path cannot have a field name. Get {self.field_name}")
elif self.field_name is None:
raise ValueError(f"RequestOption expected field name for type {self.inject_into}")

def is_path(self) -> bool:
"""Returns true if the parameter is the path to send the request to"""
return self.inject_into == RequestOptionType.path
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Mapping


@dataclass
class RequestPath:
"""
Describes that a component value should be inserted into the path
"""

parameters: InitVar[Mapping[str, Any]]
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ def __post_init__(self, parameters: Mapping[str, Any]):
if not self.end_datetime.datetime_format:
self.end_datetime.datetime_format = self.datetime_format

if self.start_time_option and self.start_time_option.inject_into == RequestOptionType.path:
raise ValueError("Start time cannot be passed by path")
if self.end_time_option and self.end_time_option.inject_into == RequestOptionType.path:
raise ValueError("End time cannot be passed by path")

def get_stream_state(self) -> StreamState:
return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ def __post_init__(self, parameters: Mapping[str, Any]):
if isinstance(self.cursor_field, str):
self.cursor_field = InterpolatedString(string=self.cursor_field, parameters=parameters)

if self.request_option and self.request_option.inject_into == RequestOptionType.path:
raise ValueError("Slice value cannot be injected in the path")
self._cursor = None

def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _get_request_option(self, option_type: RequestOptionType, stream_slice: Stre
key = parent_config.stream_slice_field
value = stream_slice.get(key)
if value:
params.update({key: value})
params.update({parent_config.request_option.field_name: value})
return params

def get_stream_state(self) -> StreamState:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"paginator": {
"type": "DefaultPaginator",
"page_size_option": {"type": "RequestOption"},
"page_token_option": {"type": "RequestOption"},
"page_token_option": {},
},
},
id="test_default_paginator",
Expand Down
Loading