Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low-code cdk] Enable configurable state checkpointing #14317

Merged
merged 21 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ class DeclarativeStream(Stream):
DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
"""

def __init__(self, name, primary_key, schema_loader: SchemaLoader, retriever: Retriever, cursor_field: Optional[List[str]] = None):
def __init__(
self,
name: str,
primary_key,
schema_loader: SchemaLoader,
retriever: Retriever,
cursor_field: Optional[List[str]] = None,
checkpoint_interval: Optional[int] = None,
):
self._name = name
self._primary_key = primary_key
self._cursor_field = cursor_field or []
self._schema_loader = schema_loader
self._retriever = retriever
self._checkpoint_interval = checkpoint_interval

@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
Expand All @@ -34,6 +43,20 @@ def name(self) -> str:
"""
return self._name

@property
def state_checkpoint_interval(self) -> Optional[int]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading
100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source.

Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled.

return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in
ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of
created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read.
"""
return self._checkpoint_interval

@property
def state(self) -> MutableMapping[str, Any]:
return self._retriever.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ def test():
retriever.state = state
retriever.read_records.return_value = records
retriever.stream_slices.return_value = stream_slices
retriever.state_checkpoint_interval = checkpoint_interval

stream = DeclarativeStream(
name=name,
primary_key=primary_key,
cursor_field=cursor_field,
schema_loader=schema_loader,
retriever=retriever,
checkpoint_interval=checkpoint_interval,
)

assert stream.name == name
Expand All @@ -43,3 +43,4 @@ def test():
assert stream.primary_key == primary_key
assert stream.cursor_field == cursor_field
assert stream.stream_slices(sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_state=None) == stream_slices
assert stream.state_checkpoint_interval == checkpoint_interval