Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit a state message even if no records were read #15067

Merged
merged 13 commits into from
Aug 4, 2022
4 changes: 4 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ def _read_incremental(
)
logger.debug(f"Processing stream slices for {stream_name}", extra={"stream_slices": slices})
total_records_counter = 0
if not slices:
# Safety net to ensure we always emit at least one state message even if there are no slices
checkpoint = self._checkpoint_state(stream_instance, stream_instance.state, connector_state)
Copy link
Contributor

@tuliren tuliren Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@girarda, there is an on call issue that seems related to this new change.
https://sentry.io/organizations/airbytehq/issues/3521477693/?project=6527718
https://github.com/airbytehq/oncall/issues/466

'Events' object has no attribute 'state'

I don't understand the CDK very well. But it looks like the stream_instance.state is only set conditionally:

if stream_state and "state" in dir(stream_instance):
  stream_instance.state = stream_state

That condition is different from the one here.

Should the state be set for more cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tuliren alex is on PTO - I've asked someone from the team to assist

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you.

yield checkpoint
for _slice in slices:
logger.debug("Processing stream slice", extra={"slice": _slice})
records = stream_instance.read_records(
Expand Down
74 changes: 73 additions & 1 deletion airbyte-cdk/python/unit_tests/sources/test_abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,13 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
class MockStreamWithState(MockStream):
cursor_field = "cursor"

def __init__(self, inputs_and_mocked_outputs: List[Tuple[Mapping[str, Any], Iterable[Mapping[str, Any]]]], name: str, state=None):
super().__init__(inputs_and_mocked_outputs, name)
self._state = state

@property
def state(self):
return {}
return self._state
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set a state to verify the emitted state is the same as the input state when no slices are read


@state.setter
def state(self, value):
Expand Down Expand Up @@ -452,6 +456,74 @@ def test_with_slices(self, mocker):

assert expected == messages

def test_no_slices(self, mocker):
"""
Tests that an incremental read returns at least one state messages even if no records were read:
1. outputs a state message after reading the entire stream
"""
slices = []
stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}]
state = {"cursor": "value"}
s1 = MockStreamWithState(
[
(
{
"sync_mode": SyncMode.incremental,
"stream_slice": s,
"stream_state": mocker.ANY,
},
stream_output,
)
for s in slices
],
name="s1",
state=state,
)
s2 = MockStreamWithState(
[
(
{
"sync_mode": SyncMode.incremental,
"stream_slice": s,
"stream_state": mocker.ANY,
},
stream_output,
)
for s in slices
],
name="s2",
state=state,
)

mocker.patch.object(MockStreamWithState, "supports_incremental", return_value=True)
mocker.patch.object(MockStreamWithState, "get_json_schema", return_value={})
mocker.patch.object(MockStreamWithState, "stream_slices", return_value=slices)
mocker.patch.object(
MockStreamWithState,
"state_checkpoint_interval",
new_callable=mocker.PropertyMock,
return_value=2,
)

src = MockSource(streams=[s1, s2])
catalog = ConfiguredAirbyteCatalog(
streams=[
_configured_stream(s1, SyncMode.incremental),
_configured_stream(s2, SyncMode.incremental),
]
)

expected = [
_state({"s1": state}),
_state({"s1": state, "s2": state}),
]

messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict))))

print(f"expected:\n{expected}")
print(f"messages:\n{messages}")
assert expected == messages

def test_with_slices_and_interval(self, mocker):
"""
Tests that an incremental read which uses slices and a checkpoint interval:
Expand Down