Skip to content

Commit

Permalink
reset
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Jul 20, 2022
1 parent 2a17a9b commit eb9a918
Showing 1 changed file with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.states.dict_state import DictState
from airbyte_cdk.sources.declarative.states.state import State
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http import HttpStream


Expand All @@ -25,9 +26,9 @@ def __init__(
primary_key,
requester: Requester,
record_selector: HttpSelector,
config: Config,
paginator: Optional[Paginator] = None,
stream_slicer: Optional[StreamSlicer] = SingleSlice(),
state: Optional[State] = None,
):
self._name = name
self._primary_key = primary_key
Expand All @@ -36,9 +37,9 @@ def __init__(
self._record_selector = record_selector
super().__init__(self._requester.get_authenticator())
self._iterator = stream_slicer
self._state: State = (state or DictState()).deep_copy()
self._last_response = None
self._last_records = None
self._config = config

@property
def name(self) -> str:
Expand Down Expand Up @@ -205,7 +206,6 @@ def request_params(
E.g: you might want to define query parameters for paging if next_page_token is not None.
"""
# Warning: use self.state instead of the stream_state passed as argument!

return self._get_request_options(stream_slice, next_page_token, self._requester.request_params, self._paginator.request_params)

@property
Expand Down Expand Up @@ -272,11 +272,10 @@ def read_records(
# Warning: use self.state instead of the stream_state passed as argument!
records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state)
for r in records_generator:
self._iterator.update_cursor(stream_slice, last_record=r)
self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_response=self._last_response, last_record=r)
yield r
else:
last_record = self._last_records[-1] if self._last_records else None
self._iterator.update_cursor(stream_slice, last_record=last_record)
self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_reponse=self._last_response)
yield from []

def stream_slices(
Expand All @@ -295,9 +294,9 @@ def stream_slices(

@property
def state(self) -> MutableMapping[str, Any]:
return self._iterator.get_stream_state()
return self._state.get_stream_state()

@state.setter
def state(self, value: MutableMapping[str, Any]):
"""State setter, accept state serialized by state getter."""
self._iterator.update_cursor(value)
self._state.set_state(value)

0 comments on commit eb9a918

Please sign in to comment.