Skip to content

Commit

Permalink
low-code connectors: reset pagination between stream slices (#15330)
Browse files Browse the repository at this point in the history
* reset pagination between stream slices

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* patch

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
girarda and sherifnada authored Aug 5, 2022
1 parent c5c13f0 commit 5242ff8
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ def get_request_body_json(
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)

def reset(self):
self.pagination_strategy.reset()

def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
options = {}
if self.page_token_option.inject_into == option_type:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@ def get_request_body_json(

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]:
return {}

def reset(self):
# No state to reset
pass
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ class Paginator(RequestOptionsProvider):
If the next_page_token is the path to the next page of records, then it should be accessed through the `path` method
"""

@abstractmethod
def reset(self):
"""
Reset the pagination's inner state
"""

@abstractmethod
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin
return None
token = self.cursor_value.eval(config=self.config, last_records=last_records, response=decoded_response)
return token if token else None

def reset(self):
# No state to reset
pass
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin
else:
self._offset += len(last_records)
return self._offset

def reset(self):
self._offset = 0
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ class PageIncrement(PaginationStrategy, JsonSchemaMixin):
options: InitVar[Mapping[str, Any]]

def __post_init__(self, options: Mapping[str, Any]):
self._offset = 0
self._page = 0

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
if len(last_records) < self.page_size:
return None
else:
self._offset += 1
return self._offset
self._page += 1
return self._page

def reset(self):
self._page = 0
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,9 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin
:return: next page token. Returns None if there are no more pages to fetch
"""
pass

@abstractmethod
def reset(self):
"""
Reset the pagination's inner state
"""
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
# Warning: use self.state instead of the stream_state passed as argument!
stream_slice = stream_slice or {} # None-check
self.paginator.reset()
records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state)
for r in records_generator:
self.stream_slicer.update_cursor(stream_slice, last_record=r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
from unittest.mock import MagicMock

import pytest
import requests
Expand Down Expand Up @@ -159,3 +160,13 @@ def test_limit_cannot_be_set_in_path():
assert False
except ValueError:
pass


def test_reset():
limit_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="limit", options={})
page_token_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="offset", options={})
url_base = "https://airbyte.io"
config = {}
strategy = MagicMock()
LimitPaginator(2, limit_request_option, page_token_request_option, strategy, config, url_base, options={}).reset()
assert strategy.reset.called
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ def test_offset_increment_paginator_strategy(test_name, page_size, expected_next
next_page_token = paginator_strategy.next_page_token(response, last_records)
assert expected_next_page_token == next_page_token
assert expected_offset == paginator_strategy._offset

paginator_strategy.reset()
assert 0 == paginator_strategy._offset
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
def test_page_increment_paginator_strategy(test_name, page_size, expected_next_page_token, expected_offset):
paginator_strategy = PageIncrement(page_size, options={})
assert paginator_strategy._offset == 0
assert paginator_strategy._page == 0

response = requests.Response()

Expand All @@ -29,4 +29,7 @@ def test_page_increment_paginator_strategy(test_name, page_size, expected_next_p

next_page_token = paginator_strategy.next_page_token(response, last_records)
assert expected_next_page_token == next_page_token
assert expected_offset == paginator_strategy._offset
assert expected_offset == paginator_strategy._page

paginator_strategy.reset()
assert 0 == paginator_strategy._page
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status
import pytest
Expand All @@ -15,13 +15,15 @@
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.streams.http.auth import NoAuth
from airbyte_cdk.sources.streams.http.http import HttpStream

primary_key = "pk"
records = [{"id": 1}, {"id": 2}]
config = {}


def test_simple_retriever_full():
@patch.object(HttpStream, "read_records", return_value=[])
def test_simple_retriever_full(mock_http_stream):
requester = MagicMock()
request_params = {"param": "value"}
requester.get_request_params.return_value = request_params
Expand Down Expand Up @@ -53,6 +55,9 @@ def test_simple_retriever_full():
backoff_time = 60
should_retry = ResponseStatus.retry(backoff_time)
requester.should_retry.return_value = should_retry
request_body_json = {"body": "json"}
requester.request_body_json.return_value = request_body_json

request_body_data = {"body": "data"}
requester.get_request_body_data.return_value = request_body_data
request_body_json = {"body": "json"}
Expand Down Expand Up @@ -92,12 +97,14 @@ def test_simple_retriever_full():
assert not retriever.raise_on_http_errors
assert retriever.should_retry(requests.Response())
assert retriever.backoff_time(requests.Response()) == backoff_time
assert retriever.request_body_data(None, None, None) == request_body_data
assert retriever.request_body_json(None, None, None) == request_body_json
assert retriever.request_kwargs(None, None, None) == request_kwargs
assert retriever.cache_filename == cache_filename
assert retriever.use_cache == use_cache

[r for r in retriever.read_records(SyncMode.full_refresh)]
paginator.reset.assert_called()


@pytest.mark.parametrize(
"test_name, requester_response, expected_should_retry, expected_backoff_time",
Expand Down

0 comments on commit 5242ff8

Please sign in to comment.