Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #20771] limiting the number of requests performed to the backe… #21525

Merged
merged 12 commits into from
Jan 24, 2023
Merged
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.22.0
current_version = 0.23.0
commit = False

[bumpversion:file:setup.py]
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.23.0
Limiting the number of HTTP requests during a test read

## 0.22.0
Surface the resolved manifest in the CDK

Expand Down
15 changes: 13 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union

from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteLogMessage,
AirbyteMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Level,
Status,
SyncMode,
)
Expand All @@ -34,6 +37,8 @@ class AbstractSource(Source, ABC):
in this class to create an Airbyte Specification compliant Source.
"""

SLICE_LOG_PREFIX = "slice:"

@abstractmethod
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
Expand Down Expand Up @@ -236,7 +241,10 @@ def _read_incremental(
has_slices = False
for _slice in slices:
has_slices = True
logger.debug("Processing stream slice", extra={"slice": _slice})
if logger.isEnabledFor(logging.DEBUG):
yield AirbyteMessage(
type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}")
)
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=_slice,
Expand Down Expand Up @@ -285,7 +293,10 @@ def _read_full_refresh(
)
total_records_counter = 0
for _slice in slices:
logger.debug("Processing stream slice", extra={"slice": _slice})
if logger.isEnabledFor(logging.DEBUG):
yield AirbyteMessage(
type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}")
)
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ class ManifestDeclarativeSource(DeclarativeSource):

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "schemas", "spec", "streams", "type", "version"}

def __init__(self, source_config: ConnectionDefinition, debug: bool = False, construct_using_pydantic_models: bool = False):
def __init__(
self,
source_config: ConnectionDefinition,
debug: bool = False,
component_factory: ModelToComponentFactory = None,
construct_using_pydantic_models: bool = False,
):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
:param debug(bool): True if debug mode is enabled
Expand All @@ -71,7 +77,12 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False, con
self._legacy_source_config = resolved_source_config
self._debug = debug
self._legacy_factory = DeclarativeComponentFactory() # Legacy factory used to instantiate declarative components from the manifest
self._constructor = ModelToComponentFactory() # New factory which converts the manifest to Pydantic models to construct components
if component_factory:
self._constructor = component_factory
else:
self._constructor = (
ModelToComponentFactory()
) # New factory which converts the manifest to Pydantic models to construct components

self._validate_source()

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import DefaultPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import DefaultPaginator, PaginatorTestReadDecorator
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy

__all__ = ["DefaultPaginator", "NoPagination", "PaginationStrategy", "Paginator"]
__all__ = ["DefaultPaginator", "NoPagination", "PaginationStrategy", "Paginator", "PaginatorTestReadDecorator"]
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,69 @@ def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, A
if option_type != RequestOptionType.path:
options[self.page_size_option.field_name] = self.pagination_strategy.get_page_size()
return options


class PaginatorTestReadDecorator(Paginator):
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
"""
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
pages that are queried throughout a read command.
"""

_PAGE_COUNT_BEFORE_FIRST_NEXT_CALL = 1

def __init__(self, decorated, maximum_number_of_pages: int = 5):
if maximum_number_of_pages and maximum_number_of_pages < 1:
raise ValueError(f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}")
self._maximum_number_of_pages = maximum_number_of_pages
self._decorated = decorated
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
if self._page_count >= self._maximum_number_of_pages:
return None

self._page_count += 1
return self._decorated.next_page_token(response, last_records)

def path(self):
return self._decorated.path()

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._decorated.get_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, str]:
return self._decorated.get_request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def get_request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._decorated.get_request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._decorated.get_request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def reset(self):
self._decorated.reset()
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import abstractmethod
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional

Expand All @@ -12,7 +12,7 @@


@dataclass
class Paginator(RequestOptionsProvider, JsonSchemaMixin):
class Paginator(ABC, RequestOptionsProvider, JsonSchemaMixin):
"""
Defines the token to use to fetch the next page of records from the API.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
#

from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever, SimpleRetrieverTestReadDecorator

__all__ = ["Retriever", "SimpleRetriever"]
__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
from dataclasses import InitVar, dataclass, field
from itertools import islice
from json import JSONDecodeError
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

Expand Down Expand Up @@ -416,6 +417,28 @@ def _parse_records_and_emit_request_and_responses(self, request, response, strea
yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state)


@dataclass
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
"""
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
slices that are queried throughout a read command.
"""

maximum_number_of_slices: int = 5

def __post_init__(self, options: Mapping[str, Any]):
super().__post_init__(options)
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
raise ValueError(
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
)

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
return islice(super().stream_slices(sync_mode=sync_mode, stream_state=stream_state), self.maximum_number_of_slices)


def _prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage:
# FIXME: this should return some sort of trace message
request_dict = {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.22.0",
version="0.23.0",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
import requests
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import DefaultPaginator, RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import (
DefaultPaginator,
PaginatorTestReadDecorator,
RequestOption,
RequestOptionType,
)
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import CursorPaginationStrategy


Expand Down Expand Up @@ -202,3 +207,25 @@ def test_reset():
strategy = MagicMock()
DefaultPaginator(strategy, config, url_base, options={}, page_size_option=page_size_request_option, page_token_option=page_token_request_option).reset()
assert strategy.reset.called


def test_limit_page_fetched():
maximum_number_of_pages = 5
number_of_next_performed = maximum_number_of_pages - 1
paginator = PaginatorTestReadDecorator(
DefaultPaginator(
page_size_option=MagicMock(),
page_token_option=MagicMock(),
pagination_strategy=MagicMock(),
config=MagicMock(),
url_base=MagicMock(),
options={},
),
maximum_number_of_pages
)

for _ in range(number_of_next_performed):
last_token = paginator.next_page_token(MagicMock(), MagicMock())
assert last_token

assert not paginator.next_page_token(MagicMock(), MagicMock())
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
_prepared_request_to_airbyte_message,
_response_to_airbyte_message,
)
Expand Down Expand Up @@ -629,3 +630,28 @@ def test_response_to_airbyte_message(test_name, response_body, response_headers,
actual_airbyte_message = _response_to_airbyte_message(response)

assert expected_airbyte_message == actual_airbyte_message


def test_limit_stream_slices():
maximum_number_of_slices = 4
stream_slicer = MagicMock()
stream_slicer.stream_slices.return_value = _generate_slices(maximum_number_of_slices * 2)
retriever = SimpleRetrieverTestReadDecorator(
name="stream_name",
primary_key=primary_key,
requester=MagicMock(),
paginator=MagicMock(),
record_selector=MagicMock(),
stream_slicer=stream_slicer,
maximum_number_of_slices=maximum_number_of_slices,
options={},
config={},
)

truncated_slices = list(retriever.stream_slices(sync_mode=SyncMode.incremental, stream_state=None))

assert truncated_slices == _generate_slices(maximum_number_of_slices)


def _generate_slices(number_of_slices):
return [{"date": f"2022-01-0{day + 1}"} for day in range(number_of_slices)]
Loading