diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py index 5dfd37f43a09..f52e496bd3c4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py @@ -16,12 +16,21 @@ class DeclarativeStream(Stream): DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever """ - def __init__(self, name, primary_key, schema_loader: SchemaLoader, retriever: Retriever, cursor_field: Optional[List[str]] = None): + def __init__( + self, + name: str, + primary_key, + schema_loader: SchemaLoader, + retriever: Retriever, + cursor_field: Optional[List[str]] = None, + checkpoint_interval: Optional[int] = None, + ): self._name = name self._primary_key = primary_key self._cursor_field = cursor_field or [] self._schema_loader = schema_loader self._retriever = retriever + self._checkpoint_interval = checkpoint_interval @property def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: @@ -34,6 +43,20 @@ def name(self) -> str: """ return self._name + @property + def state_checkpoint_interval(self) -> Optional[int]: + """ + Decides how often to checkpoint state (i.e: emit a STATE message). E.g: if this returns a value of 100, then state is persisted after reading + 100 records, then 200, 300, etc.. A good default value is 1000 although your mileage may vary depending on the underlying data source. + + Checkpointing a stream avoids re-reading records in the case a sync is failed or cancelled. + + return None if state should not be checkpointed e.g: because records returned from the underlying data source are not returned in + ascending order with respect to the cursor field. This can happen if the source does not support reading records in ascending order of + created_at date (or whatever the cursor is). In those cases, state must only be saved once the full stream has been read. + """ + return self._checkpoint_interval + @property def state(self) -> MutableMapping[str, Any]: return self._retriever.state diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py index a5a05d286e4f..a060709ccf36 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py @@ -26,7 +26,6 @@ def test(): retriever.state = state retriever.read_records.return_value = records retriever.stream_slices.return_value = stream_slices - retriever.state_checkpoint_interval = checkpoint_interval stream = DeclarativeStream( name=name, @@ -34,6 +33,7 @@ def test(): cursor_field=cursor_field, schema_loader=schema_loader, retriever=retriever, + checkpoint_interval=checkpoint_interval, ) assert stream.name == name @@ -43,3 +43,4 @@ def test(): assert stream.primary_key == primary_key assert stream.cursor_field == cursor_field assert stream.stream_slices(sync_mode=SyncMode.incremental, cursor_field=cursor_field, stream_state=None) == stream_slices + assert stream.state_checkpoint_interval == checkpoint_interval