From eb9a918a095a22c6849d50f8881589a1b58a9309 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 20 Jul 2022 07:51:12 -0700 Subject: [PATCH] reset --- .../declarative/retrievers/simple_retriever.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 1450c01da808..ce46fa4be270 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -12,9 +12,10 @@ from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.requester import Requester from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.declarative.states.dict_state import DictState +from airbyte_cdk.sources.declarative.states.state import State from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer -from airbyte_cdk.sources.declarative.types import Config from airbyte_cdk.sources.streams.http import HttpStream @@ -25,9 +26,9 @@ def __init__( primary_key, requester: Requester, record_selector: HttpSelector, - config: Config, paginator: Optional[Paginator] = None, stream_slicer: Optional[StreamSlicer] = SingleSlice(), + state: Optional[State] = None, ): self._name = name self._primary_key = primary_key @@ -36,9 +37,9 @@ def __init__( self._record_selector = record_selector super().__init__(self._requester.get_authenticator()) self._iterator = stream_slicer + self._state: State = (state or DictState()).deep_copy() self._last_response = None self._last_records = None - self._config = config @property def name(self) -> str: @@ -205,7 +206,6 @@ def request_params( E.g: you might want to define query parameters for paging if next_page_token is not None. """ # Warning: use self.state instead of the stream_state passed as argument! - return self._get_request_options(stream_slice, next_page_token, self._requester.request_params, self._paginator.request_params) @property @@ -272,11 +272,10 @@ def read_records( # Warning: use self.state instead of the stream_state passed as argument! records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state) for r in records_generator: - self._iterator.update_cursor(stream_slice, last_record=r) + self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_response=self._last_response, last_record=r) yield r else: - last_record = self._last_records[-1] if self._last_records else None - self._iterator.update_cursor(stream_slice, last_record=last_record) + self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_reponse=self._last_response) yield from [] def stream_slices( @@ -295,9 +294,9 @@ def stream_slices( @property def state(self) -> MutableMapping[str, Any]: - return self._iterator.get_stream_state() + return self._state.get_stream_state() @state.setter def state(self, value: MutableMapping[str, Any]): """State setter, accept state serialized by state getter.""" - self._iterator.update_cursor(value) + self._state.set_state(value)