-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[per-stream cdk] Support deserialization of legacy and per-stream state #16205
Conversation
@@ -91,10 +92,12 @@ def read( | |||
logger: logging.Logger, | |||
config: Mapping[str, Any], | |||
catalog: ConfiguredAirbyteCatalog, | |||
state: MutableMapping[str, Any] = None, | |||
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should never expect a MutableMapping[str, Any]
, but in order to retain backwards compatibility with OSS users who pull the latest CDK, we should continue to support the type as defined in the interface. The connector manager under the hood knows how to interpret and store this as legacy.
@@ -133,6 +136,10 @@ def read( | |||
|
|||
logger.info(f"Finished syncing {self.name}") | |||
|
|||
@property | |||
def per_stream_state_enabled(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused at the moment, but we'll want this defined so we can parallelize the next two tasks that use this function
if not state_obj: | ||
return [] | ||
elif isinstance(state_obj, List): | ||
return [AirbyteStateMessage.parse_obj(state) for state in state_obj] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as part of instantiation, pydantic will verify correctness of input. granted a lot of input is optional at the top level state (to support backwards compatability)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notes from over the shoulder review: see if pydantic can validate at least 1 of the 3 fields is populated, or add validation here to ensure at least one of global_ or streams is populated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is what you ended up doing with Type.check_at_least_one
. 👍
…lobal, or data fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've no blocking comments, just let me know what you think of my suggestions 😄
return state | ||
if not state_obj: | ||
return [] | ||
elif isinstance(state_obj, List): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit for readability:
elif isinstance(state_obj, List): | |
is_per_stream_state = isinstance(state_obj, List) | |
if is_per_stream: | |
return [AirbyteStateMessage.parse_obj(state) for state in state_obj] | |
else: | |
return [AirbyteStateMessage(type=AirbyteStateType.LEGACY, data=state_obj)] |
|
||
class ConnectorStateManager: | ||
""" | ||
ConnectorStateManager consolidates the various forms of a stream's incoming state message (STREAM / GLOBAL / LEGACY) under a common |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if you could update this documentation from this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i could be mistaken, but the docstrings should get published automatically as part of how readthedocs works I thought
airbyte-cdk/python/airbyte_cdk/sources/connector_state_manager.py
Outdated
Show resolved
Hide resolved
# To maintain backwards compatibility, all of AirbyteStateMessage's fields are optional, but we should validate | ||
# that at least one of the three fields is defined. | ||
@root_validator() | ||
def check_at_least_one(cls, values): | ||
if values.get("global_") is None and values.get("data") is None and values.get("stream") is None: | ||
raise ValueError("AirbyteStateMessage should contain either a stream, global, or state field") | ||
return values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the protocol classes are auto-generated from the protocol definitions in YAML. This change will be removed on the next protocol update. A workaround could be to declare a ValidatedStateType
inheriting from Type
with this validator method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really good callout! It might be a little heavy handed to define a new class and pass it everywhere for a pretty simple check. Instead I'm just going to do a conditional check in read_state()
that returns a value error if input is invalid. And leave airbyte_protocol as is.
if not state_obj: | ||
return [] | ||
elif isinstance(state_obj, List): | ||
return [AirbyteStateMessage.parse_obj(state) for state in state_obj] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is what you ended up doing with Type.check_at_least_one
. 👍
"test_name, input_state, expected_legacy_state", | ||
[ | ||
( | ||
"test_legacy_input_state", | ||
[AirbyteStateMessage(type=AirbyteStateType.LEGACY, data={"actresses": {"id": "seehorn_rhea"}})], | ||
{"actresses": {"id": "seehorn_rhea"}}, | ||
), | ||
( | ||
"test_supports_legacy_json_blob", | ||
{ | ||
"actors": {"created_at": "1962-10-22"}, | ||
"actresses": {"id": "seehorn_rhea"}, | ||
}, | ||
{"actors": {"created_at": "1962-10-22"}, "actresses": {"id": "seehorn_rhea"}}, | ||
), | ||
("test_initialize_empty_mapping_by_default", {}, {}), | ||
("test_initialize_empty_state", [], {}), | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the first element of the tuple is used as a test id for Pytest
"test_name, input_state, expected_legacy_state", | |
[ | |
( | |
"test_legacy_input_state", | |
[AirbyteStateMessage(type=AirbyteStateType.LEGACY, data={"actresses": {"id": "seehorn_rhea"}})], | |
{"actresses": {"id": "seehorn_rhea"}}, | |
), | |
( | |
"test_supports_legacy_json_blob", | |
{ | |
"actors": {"created_at": "1962-10-22"}, | |
"actresses": {"id": "seehorn_rhea"}, | |
}, | |
{"actors": {"created_at": "1962-10-22"}, "actresses": {"id": "seehorn_rhea"}}, | |
), | |
("test_initialize_empty_mapping_by_default", {}, {}), | |
("test_initialize_empty_state", [], {}), | |
], | |
" input_state, expected_legacy_state", | |
[ | |
pytest.params( | |
[AirbyteStateMessage(type=AirbyteStateType.LEGACY, data={"actresses": {"id": "seehorn_rhea"}})], | |
{"actresses": {"id": "seehorn_rhea"}}, | |
id="test_legacy_input_state" | |
), | |
... | |
], |
{"actors": {"created_at": "1962-10-22"}, "actresses": {"id": "seehorn_rhea"}}, | ||
), | ||
("test_initialize_empty_mapping_by_default", {}, {}), | ||
("test_initialize_empty_state", [], {}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we could add a case with an input_state
which is not a MutableMapping, nor a list of AirbyteStateMessage
and assert an error is raised?
if expected_error: | ||
with pytest.raises(expected_error): | ||
source.read_state(state_file.name) | ||
else: | ||
actual = source.read_state(state_file.name) | ||
assert actual == expected_state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: What do you think about using does_not_raise
for a cleaner conditional raising?
if expected_error: | |
with pytest.raises(expected_error): | |
source.read_state(state_file.name) | |
else: | |
actual = source.read_state(state_file.name) | |
assert actual == expected_state | |
# If expected_error == does_not_raise pytest will not expect an error to be raised and will continue the test execution | |
with pytest.raises(expected_error): | |
source.read_state(state_file.name) | |
actual = source.read_state(state_file.name) | |
assert actual == expected_state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure I understand the code suggestion you posted. The way it's written, pytest.raises(expected_error)
expects an iterable type, so the test will fail on any happy path case since the expected_error is None. Wouldn't these tests also fail since w/o the condition, we'd always be expecting some type of error which isn't the case for valid state input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually reading more about this do you mean, that for all happy path cases as test parameters we use does_not_raise
and then when checking for errors pytest.raises(Exception
. And code looking something like this:
with expected_error:
actual = source.read_state(state_file.name)
assert actual == expected_state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly! (example here)
…te (airbytehq#16205) * interpret legacy and new per-stream format into AirbyteStateMessages * add ConnectorStateManager stubs for future work * remove frozen for the time being until we need to hash descriptors * add validation that AirbyteStateMessage has at least one of stream, global, or data fields * pr feedback and clean up of the code * remove changes to airbyte_protocol and perform validation in read_state() * fix import formatting
…te (airbytehq#16205) * interpret legacy and new per-stream format into AirbyteStateMessages * add ConnectorStateManager stubs for future work * remove frozen for the time being until we need to hash descriptors * add validation that AirbyteStateMessage has at least one of stream, global, or data fields * pr feedback and clean up of the code * remove changes to airbyte_protocol and perform validation in read_state() * fix import formatting
What
Adds support for deserializing incoming stream state as the legacy format (JSON object) and the new per-stream format (List of AirbyteStateMessage). This all gets packaged up to initialize a ConnectorStateManager which will be necessary for the next tasks. These changes are a no-op for the existing flow and do not currently account for interpreting the per-stream format. That will come in a subsequent task.
How
Rather than pass around either the legacy JSON blob or the new per-stream state list to all downstream components, in entrypoint.py/source.py, we're going to consolidate stream input into the new format only. This should simplify the mental model when we invoke
abstract_source.py:read()
.From this point forward,
List[AirbyteStateMessage]
is what should be passed toabstract_source.py
. The input will always be parsed into a List of at least one element for incoming legacy/global state. If legacy state is empty or stream state, it will parse into a List of 0 or more elements. You can refer to the tests to see the various scenarios.abstract_source.py
consolidates the storage and retrieval of the legacy state, but the behavior remains the same in what is passed to_read_stream()
and subsequent_read_incremental()
.Testing
Reinstalled the local version of the CDK on the sengrid connector and tried various incremental syncs with varying input.
Recommended reading order
source.py
connector_state_manager.py
abstract_source.py