From 5242ff8e95e58cad106f62d5f3b59a75c39ae940 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Fri, 5 Aug 2022 16:44:56 -0700 Subject: [PATCH] low-code connectors: reset pagination between stream slices (#15330) * reset pagination between stream slices * Update airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py Co-authored-by: Sherif A. Nada * Update airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py Co-authored-by: Sherif A. Nada * patch Co-authored-by: Sherif A. Nada --- .../requesters/paginators/limit_paginator.py | 3 +++ .../requesters/paginators/no_pagination.py | 4 ++++ .../declarative/requesters/paginators/paginator.py | 6 ++++++ .../strategies/cursor_pagination_strategy.py | 4 ++++ .../paginators/strategies/offset_increment.py | 3 +++ .../paginators/strategies/page_increment.py | 9 ++++++--- .../paginators/strategies/pagination_strategy.py | 6 ++++++ .../declarative/retrievers/simple_retriever.py | 1 + .../requesters/paginators/test_limit_paginator.py | 11 +++++++++++ .../requesters/paginators/test_offset_increment.py | 3 +++ .../requesters/paginators/test_page_increment.py | 7 +++++-- .../declarative/retrievers/test_simple_retriever.py | 13 ++++++++++--- 12 files changed, 62 insertions(+), 8 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py index 675270f0da87..bf9adcbd515b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py @@ -147,6 +147,9 @@ def get_request_body_json( ) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.body_json) + def reset(self): + self.pagination_strategy.reset() + def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: options = {} if self.page_token_option.inject_into == option_type: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index ac54ba0bc70e..210b00c73123 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -59,3 +59,7 @@ def get_request_body_json( def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]: return {} + + def reset(self): + # No state to reset + pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index e77ca744b3ed..68b18307e088 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -19,6 +19,12 @@ class Paginator(RequestOptionsProvider): If the next_page_token is the path to the next page of records, then it should be accessed through the `path` method """ + @abstractmethod + def reset(self): + """ + Reset the pagination's inner state + """ + @abstractmethod def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py index 09d036580f8f..81577159735b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -46,3 +46,7 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin return None token = self.cursor_value.eval(config=self.config, last_records=last_records, response=decoded_response) return token if token else None + + def reset(self): + # No state to reset + pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py index bfbd92df3e24..e6ab8a03fb58 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py @@ -31,3 +31,6 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin else: self._offset += len(last_records) return self._offset + + def reset(self): + self._offset = 0 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py index f39ca388ada1..46e112a0397f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py @@ -23,11 +23,14 @@ class PageIncrement(PaginationStrategy, JsonSchemaMixin): options: InitVar[Mapping[str, Any]] def __post_init__(self, options: Mapping[str, Any]): - self._offset = 0 + self._page = 0 def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: if len(last_records) < self.page_size: return None else: - self._offset += 1 - return self._offset + self._page += 1 + return self._page + + def reset(self): + self._page = 0 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py index 7174fc16a377..a2d9407a833d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py @@ -24,3 +24,9 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin :return: next page token. Returns None if there are no more pages to fetch """ pass + + @abstractmethod + def reset(self): + """ + Reset the pagination's inner state + """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 8eda1ec15401..4cfdbc8fd148 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -342,6 +342,7 @@ def read_records( ) -> Iterable[Mapping[str, Any]]: # Warning: use self.state instead of the stream_state passed as argument! stream_slice = stream_slice or {} # None-check + self.paginator.reset() records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state) for r in records_generator: self.stream_slicer.update_cursor(stream_slice, last_record=r) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_limit_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_limit_paginator.py index cbdae4e48531..26d55a0276ee 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_limit_paginator.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_limit_paginator.py @@ -3,6 +3,7 @@ # import json +from unittest.mock import MagicMock import pytest import requests @@ -159,3 +160,13 @@ def test_limit_cannot_be_set_in_path(): assert False except ValueError: pass + + +def test_reset(): + limit_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="limit", options={}) + page_token_request_option = RequestOption(inject_into=RequestOptionType.request_parameter, field_name="offset", options={}) + url_base = "https://airbyte.io" + config = {} + strategy = MagicMock() + LimitPaginator(2, limit_request_option, page_token_request_option, strategy, config, url_base, options={}).reset() + assert strategy.reset.called diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py index 7376ef155b43..c8f11a76ad60 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py @@ -30,3 +30,6 @@ def test_offset_increment_paginator_strategy(test_name, page_size, expected_next next_page_token = paginator_strategy.next_page_token(response, last_records) assert expected_next_page_token == next_page_token assert expected_offset == paginator_strategy._offset + + paginator_strategy.reset() + assert 0 == paginator_strategy._offset diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py index fa3808a916b0..9d85cf8298b9 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py @@ -18,7 +18,7 @@ ) def test_page_increment_paginator_strategy(test_name, page_size, expected_next_page_token, expected_offset): paginator_strategy = PageIncrement(page_size, options={}) - assert paginator_strategy._offset == 0 + assert paginator_strategy._page == 0 response = requests.Response() @@ -29,4 +29,7 @@ def test_page_increment_paginator_strategy(test_name, page_size, expected_next_p next_page_token = paginator_strategy.next_page_token(response, last_records) assert expected_next_page_token == next_page_token - assert expected_offset == paginator_strategy._offset + assert expected_offset == paginator_strategy._page + + paginator_strategy.reset() + assert 0 == paginator_strategy._page diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index ad1ce696acb5..00d4e7fb7a87 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status import pytest @@ -15,13 +15,15 @@ from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever from airbyte_cdk.sources.streams.http.auth import NoAuth +from airbyte_cdk.sources.streams.http.http import HttpStream primary_key = "pk" records = [{"id": 1}, {"id": 2}] config = {} -def test_simple_retriever_full(): +@patch.object(HttpStream, "read_records", return_value=[]) +def test_simple_retriever_full(mock_http_stream): requester = MagicMock() request_params = {"param": "value"} requester.get_request_params.return_value = request_params @@ -53,6 +55,9 @@ def test_simple_retriever_full(): backoff_time = 60 should_retry = ResponseStatus.retry(backoff_time) requester.should_retry.return_value = should_retry + request_body_json = {"body": "json"} + requester.request_body_json.return_value = request_body_json + request_body_data = {"body": "data"} requester.get_request_body_data.return_value = request_body_data request_body_json = {"body": "json"} @@ -92,12 +97,14 @@ def test_simple_retriever_full(): assert not retriever.raise_on_http_errors assert retriever.should_retry(requests.Response()) assert retriever.backoff_time(requests.Response()) == backoff_time - assert retriever.request_body_data(None, None, None) == request_body_data assert retriever.request_body_json(None, None, None) == request_body_json assert retriever.request_kwargs(None, None, None) == request_kwargs assert retriever.cache_filename == cache_filename assert retriever.use_cache == use_cache + [r for r in retriever.read_records(SyncMode.full_refresh)] + paginator.reset.assert_called() + @pytest.mark.parametrize( "test_name, requester_response, expected_should_retry, expected_backoff_time",