Skip to content

Commit

Permalink
[low-code connectors] Refactor paginator component so it owns the req…
Browse files Browse the repository at this point in the history
…uest options to set (#14433)

* checkout files from test branch

* read_incremental works

* reset to master

* remove dead code

* comment

* fix

* Add test

* comments

* utc

* format

* small fix

* Add test with rfc3339

* remove unused param

* fix test

* configurable state checkpointing

* update test

* start working on retrier

* retry predicate

* return response status

* look in error message

* cleanup test

* constant backoff strategy

* chain backoff strategy

* chain retrier

* Add to class types registry

* extract backoff time from header

* wait until

* update

* split file

* parse_records

* classmethod

* delete dead code

* comment

* comment

* comments

* fix

* test for instantiating chain retrier

* fix parsing

* cleanup

* fix

* reset

* never raise on http error

* remove print

* comment

* comment

* comment

* comment

* remove prints

* add declarative stream to registry

* start working on limit paginator

* support for offset pagination

* tests

* move limit value

* extract request option

* boilerplate

* page increment

* delete offset paginator

* update conditional paginator

* refactor and fix test

* fix test

* small fix

* Delete dead code

* Add docstrings

* quick fix

* exponential backoff

* fix test

* fix

* delete unused properties

* fix

* missing unit tests

* uppercase

* docstrings

* rename to success

* compare full request instead of just url

* renmae module

* rename test file

* rename interface

* rename default retrier

* rename to compositeerrorhandler

* fix missing renames

* move action to filter

* str -> minmaxdatetime

* small fixes

* plural

* add example

* handle header variations

* also fix wait time from

* allow using a regex to extract the value

* group()

* docstring

* add docs

* update comment

* docstrings

* fix tests

* rename param

* cleanup stop_condition

* cleanup

* Add examples

* interpolated pagination strategy

* dont need duplicate class

* docstrings

* more docstrings

* docstrings

* update comment

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* version: Update Parquet library to latest release (#14502)

The upstream Parquet library that is currently pinned for use in the S3 destination plugin is over a year old. The current version is generating invalid schemas for date-time with time-zone fields which appears to be addressed in the `1.12.3` release of the library in commit apache/parquet-java@c72862b

* merge

* 🎉 Source Github: improve schema for stream `pull_request_commits` added "null" (#14613)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>

* Docs: Fixed broken links (#14622)

* fixing broken links

* more broken links

* source-hubspot: change mentioning of Mailchimp into HubSpot  doc (#14620)

* Helm Chart: Add external temporal option (#14597)

* conflict env configmap and chart lock

* reverting lock

* add eof lines and documentation on values yaml

* conflict json file

* rollback json

* solve conflict

* correct minio with new version

Co-authored-by: Guy Feldman <gfeldman@86labs.com>

* 🎉 Add YAML format to source-file reader (#14588)

* Add yaml reader

* Update docs

* Bumpversion of connector

* bump docs

* Update pyarrow dependency

* Upgrade pandas dependency

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* 🎉 Source Okta: add GroupMembers stream (#14380)

* add Group_Members stream to okta source

- Group_Members return a list of users, the same schema of Users stream.
- Create a shared schema users, and both group_members and users sechema use it as a reference.
- Add Group_Members stream to source connector

* add tests and fix logs schema

- fix the test error: None is not one of enums though the enum type includes both string and null, it comes from json schema validator
https://github.com/python-jsonschema/jsonschema/blob/ddb87afad8f5d5c40600b5ede0ab96e4d4bdf7d3/jsonschema/_validators.py#L279-L285
- change grouop_members to use id as the cursor field since `filter` is not supported in the query string
- fix the abnormal state test on logs stream, when since is abnormally large, until has to defined, an equal or a larger value
- remove logs stream from full sync test, because 2 full sync always has a gap -- at least a new log about users or groups api.

* last polish before submit the PR

- bump docker version
- update changelog
- add the right abnormal value for logs stream
- correct the sample catalog

* address comments::

- improve comments for until parameter under the logs stream
- add use_cache on groupMembers

* add use_cache to Group_Members

* change configured_catalog to test

* auto-bump connector version

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* split test files

* renames

* missing unit test

* add missing unit tests

* rename

* assert isinstance

* start extracting to their own files

* use final instead of classmethod

* assert we retry 429 errors

* Add log

* replace asserts with valueexceptions

* delete superfluous print statement

* fix factory so we don't need to union everything with strings

* get class_name from type

* remove from class types registry

* process error handlers one at a time

* sort

* delete print statement

* comment

* comment

* format

* delete unused file

* comment

* interpolatedboolean

* comment

* not optional

* not optional

* unit tests

* fix request body data

* add test

* move file to right module

* update

* reset to master

* format

* rename to pass_by

* rename to page size

* fix

* add test

* fix body data

* delete extra newlines

* move to subpackage

* fix imports

* handle str body data

* simplify

* fix typing

* always return a map

* rename to inject_into

* only accept enum

* delete conditional paginator

* only return body data

* missing test

* update docstrings

* update docstrings

* update comment

* rename

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: Tobias Macey <tmacey@boundlessnotions.com>
Co-authored-by: Serhii Chvaliuk <grubberr@gmail.com>
Co-authored-by: Amruta Ranade <11484018+Amruta-Ranade@users.noreply.github.com>
Co-authored-by: Bas Beelen <bjgbeelen@gmail.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Guy Feldman <gfeldman@86labs.com>
Co-authored-by: Christophe Duong <christophe.duong@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Yiyang Li <yiyangli2010@gmail.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
  • Loading branch information
12 people authored and mfsiega-airbyte committed Jul 21, 2022
1 parent bdd307c commit 1f122d2
Show file tree
Hide file tree
Showing 32 changed files with 879 additions and 382 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.offset_paginator import OffsetPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.limit_paginator import LimitPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import CursorPaginationStrategy
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import OffsetIncrement
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
Expand All @@ -29,17 +29,17 @@
"CartesianProductStreamSlicer": CartesianProductStreamSlicer,
"CompositeErrorHandler": CompositeErrorHandler,
"ConstantBackoffStrategy": ConstantBackoffStrategy,
"CursorPagination": CursorPaginationStrategy,
"DatetimeStreamSlicer": DatetimeStreamSlicer,
"DeclarativeStream": DeclarativeStream,
"DefaultErrorHandler": DefaultErrorHandler,
"ExponentialBackoffStrategy": ExponentialBackoffStrategy,
"HttpRequester": HttpRequester,
"InterpolatedPaginator": InterpolatedPaginator,
"JelloExtractor": JelloExtractor,
"LimitPaginator": LimitPaginator,
"ListStreamSlicer": ListStreamSlicer,
"MinMaxDatetime": MinMaxDatetime,
"NextPageUrlPaginator": NextPageUrlPaginator,
"OffsetPaginator": OffsetPaginator,
"OffsetIncrement": OffsetIncrement,
"RemoveFields": RemoveFields,
"TokenAuthenticator": TokenAuthenticator,
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.paginators.limit_paginator import RequestOption
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
Expand All @@ -35,19 +37,22 @@
from airbyte_cdk.sources.streams.core import Stream

DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = {
ConnectionChecker: CheckStream,
Decoder: JsonDecoder,
ErrorHandler: DefaultErrorHandler,
HttpResponseFilter: HttpResponseFilter,
HttpSelector: RecordSelector,
InterpolatedBoolean: InterpolatedBoolean,
InterpolatedRequestOptionsProvider: InterpolatedRequestOptionsProvider,
InterpolatedString: InterpolatedString,
MinMaxDatetime: MinMaxDatetime,
Paginator: NoPagination,
RequestOption: RequestOption,
RequestOptionsProvider: InterpolatedRequestOptionsProvider,
Requester: HttpRequester,
Retriever: SimpleRetriever,
SchemaLoader: JsonSchema,
HttpSelector: RecordSelector,
ConnectionChecker: CheckStream,
ErrorHandler: DefaultErrorHandler,
Decoder: JsonDecoder,
State: DictState,
StreamSlicer: SingleSlice,
RequestOptionsProvider: InterpolatedRequestOptionsProvider,
Paginator: NoPagination,
HttpResponseFilter: HttpResponseFilter,
Stream: DeclarativeStream,
MinMaxDatetime: MinMaxDatetime,
InterpolatedString: InterpolatedString,
StreamSlicer: SingleSlice,
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping, Union
from typing import Any, Mapping

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
Expand All @@ -26,7 +26,7 @@ def __init__(self, *, config, request_inputs=None):

def request_inputs(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Union[Mapping, str]:
) -> Mapping[str, Any]:
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
interpolated_value = self._interpolator.eval(self._config, **kwargs)

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.paginators.pagination_strategy import PaginationStrategy
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.types import Config


class LimitPaginator(Paginator):
"""
Limit paginator.
Requests pages of results with a fixed size until the pagination strategy no longer returns a next_page_token
Examples:
1.
* fetches up to 10 records at a time by setting the "limit" request param to 10
* updates the request path with "{{ decoded_response._metadata.next }}"
paginator:
type: "LimitPaginator"
limit_value: 10
limit_option:
option_type: request_parameter
field_name: page_size
page_token_option:
option_type: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ decoded_response._metadata.next }}"
`
2.
* fetches up to 5 records at a time by setting the "page_size" header to 5
* increments a record counter and set the request parameter "offset" to the value of the counter
`
paginator:
type: "LimitPaginator"
limit_value: 5
limit_option:
option_type: header
field_name: page_size
pagination_strategy:
type: "OffsetIncrement"
page_token:
option_type: "request_parameter"
field_name: "offset"
`
3.
* fetches up to 5 records at a time by setting the "page_size" request param to 5
* increments a page counter and set the request parameter "page" to the value of the counter
`
paginator:
type: "LimitPaginator"
limit_value: 5
limit_option:
option_type: request_parameter
field_name: page_size
pagination_strategy:
type: "PageIncrement"
page_token:
option_type: "request_parameter"
field_name: "page"
"""

def __init__(
self,
page_size: int,
limit_option: RequestOption,
page_token_option: RequestOption,
pagination_strategy: PaginationStrategy,
config: Config,
url_base: str,
decoder: Decoder = None,
):
"""
:param page_size: the number of records to request
:param limit_option: the request option to set the limit. Cannot be injected in the path.
:param page_token_option: the request option to set the page token
:param pagination_strategy: Strategy defining how to get the next page token
:param config: connection config
:param url_base: endpoint's base url
:param decoder: decoder to decode the response
"""
if limit_option.inject_into == RequestOptionType.path:
raise ValueError("Limit parameter cannot be a path")
self._page_size = page_size
self._config = config
self._limit_option = limit_option
self._page_token_option = page_token_option
self._pagination_strategy = pagination_strategy
self._token = None
if isinstance(url_base, str):
url_base = InterpolatedString(url_base)
self._url_base = url_base
self._decoder = decoder or JsonDecoder()

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
self._token = self._pagination_strategy.next_page_token(response, last_records)
if self._token:
return {"next_page_token": self._token}
else:
return None

def path(self):
if self._token and self._page_token_option.inject_into == RequestOptionType.path:
# Replace url base to only return the path
return str(self._token).replace(self._url_base.eval(self._config), "")
else:
return None

def request_params(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.request_parameter)

def request_headers(self) -> Mapping[str, str]:
return self._get_request_options(RequestOptionType.header)

def request_body_data(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_data)

def request_body_json(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)

def _get_request_options(self, option_type) -> Mapping[str, Any]:
options = {}
if self._page_token_option.inject_into == option_type:
if option_type != RequestOptionType.path and self._token:
options[self._page_token_option.field_name] = self._token
if self._limit_option.inject_into == option_type:
if option_type != RequestOptionType.path:
options[self._limit_option.field_name] = self._page_size
return options

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,27 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping, Optional
from typing import Any, List, Mapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator


class NoPagination(Paginator):
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
def path(self) -> Optional[str]:
return None

def request_params(self) -> Mapping[str, Any]:
return {}

def request_headers(self) -> Mapping[str, str]:
return {}

def request_body_data(self) -> Union[Mapping[str, Any], str]:
return {}

def request_body_json(self) -> Mapping[str, Any]:
return {}

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]:
return {}
Loading

0 comments on commit 1f122d2

Please sign in to comment.