-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove withRefreshedCatalog param from updateConnection endpoint #14477
Conversation
4e2257b
to
e159022
Compare
e159022
to
eb5b770
Compare
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionId); | ||
final ConnectionStateType stateType = getStateType(connectionIdRequestBody); | ||
|
||
if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET || stateType == ConnectionStateType.GLOBAL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The plan was to change this to remove the || stateType == ConnectionStateType.GLOBAL
once the postgres source is updated with the logic for handling resets of individual streams in the CDC case (I think this PR), right?
I am concerned this could lead to some issues, such as the following scenario:
- A user updates their platform to have the change to remove
|| stateType == ConnectionStateType.GLOBAL
from this line of code - This user is still using the version of the postgres source that emits GLOBAL state for CDC, but does not have that individual stream reset change (i.e. the current version of the postgres source 0.4.30)
- The user then performs an action that results in a partial reset of the connection
In this case, since the postgres source doesn't have that new logic, it could lead to bad behavior, e.g. the source just continuing to sync all streams from the current cdc cursor, or not syncing the streams that were reset at all.
@subodh1810 @benmoriceau @gosusnp FYI, I'm not super sure how we should address this issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This user is still using the version of the postgres source that emits GLOBAL state for CDC, but does not have that individual stream reset change (i.e. the current version of the postgres source 0.4.30)
@lmossman can we force them to migrate to the right postgres connector with the seed file? If they don't downgrade it should ensure that we will use the right version. There might still be an issue with companies that are using the public connectors as a custom one. I think that datadog is doing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a good point, I had thought that we had logic to prevent updating a connector if it was actively being used in a deployment, but it looks like we actually do update connectors even if they are in use
Lines 1812 to 1817 in 900c212
if (connectorRepositoriesInUse.contains(repository)) { | |
final String latestImageTag = latestDefinition.get("dockerImageTag").asText(); | |
if (hasNewPatchVersion(connectorInfo.dockerImageTag, latestImageTag)) { | |
// Update connector to the latest patch version | |
LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag); | |
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition); |
For the custom connector case, we may just have to try to communicate that those will have to be rebuilt on the latest version
eb5b770
to
cb7780b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the changes here look okay, but there are a couple of things I've realized about these changes that could break things which we should address before we move forward with this. See my comments above and below for the descriptions of these issues
final io.airbyte.protocol.models.AirbyteCatalog existingCatalog = CatalogHelpers.configuredCatalogToCatalog(existingConfiguredCatalog); | ||
final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(existingCatalog); | ||
final AirbyteCatalog newAirbyteCatalog = webBackendConnectionUpdate.getSyncCatalog(); | ||
final CatalogDiff catalogDiff = connectionsHandler.getDiff(apiExistingCatalog, newAirbyteCatalog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just reading through the connectionsHandler.getDiff()
implementation, and it looks like it the first thing it does is convert the "configured catalogs" to just normal "catalogs", which don't contain any of the configured options like the destination sync mode the user wants to use (overwrite/append), or the cursor field that they have selected to use for incremental syncs.
I think this is a problem, because it means that if, for example, a user just changes the cursor field of their stream, we will not perform a reset of that stream because that configuration is lost when we convert from configured catalog to catalog. This could lead to strange issues like the source trying to use the cursor value of one column as the cursor of another, and could be very tough to fix.
We probably want to keep the current diff logic for returning the diff to the frontend for showing the changes when a user refreshes source schema, but I think the above issue means we need to have a second diff calculation method that compares the configured fields of the streams as well, so that we also perform a reset in cases where only the configuration of a stream is changed.
FYI @benmoriceau and @cgardens as the original implementer of the diff logic, does the above sound correct to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current behavior is to propose to perform a reset if the cursor change (probably also true i). Should we also keep it as optional as well? When a sync mode is changed (e.g.: from full refresh to incremental) the cursor will also change but we might not want to perform a reset in this case. We can calculate the diff here but I am not sure about the expected behavior regarding the reset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is address in #14626
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionId); | ||
final ConnectionStateType stateType = getStateType(connectionIdRequestBody); | ||
|
||
if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET || stateType == ConnectionStateType.GLOBAL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, one more thing I think we discussed is that we probably want to allow the diff result streamsToReset
to be used when the state type is NOT_SET
, rather than just resetting all streams for the connection.
The state type will be NOT_SET
if the connection is a non-incremental one or if nothing has been synced yet. Whether we reset all streams for the connection or just streamsToReset
doesn't matter for the resulting state
(EmptyAirbyteSource will just emit a null state in either case, since there is not current state - code here). But using streamsToReset
vs getAllStreamsForConnection
does affect what will happen to the data in the destination. And it seems useful to only clear out the data for the streams that are being updated/added, rather than clearing out all data.
@benmoriceau or @andyjih do you agree with the above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If streamsToReset
only includes the streams that were updated/added, then yes we should only update the relevant streams and not all streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamsToReset
will include all streams that are updated, added, or deleted. But we will only end up clearing out the data for streams that are updated or added in the destination, because those are the only ones that are still in the catalog after it is updated in this endpoint. Streams that aren't in the catalog are not modified in the destination, so deleted streams won't have their destination data affected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be handled correctly by the empty airbyte source reset whether we pass in streamsToReset or all streams in a connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The EmptyAirbyteSource will act the same way in either case - it will output no state, since none exists; the difference is in this logic where we decide whether to set the destination sync mode of a stream in the catalog to OVERWRITE
or APPEND
based on whether the stream is present in streamsToReset
:
Lines 102 to 111 in 4e80a67
if (streamsToReset.contains(streamDescriptor)) { | |
// The Reset Source will emit no record messages for any streams, so setting the destination sync | |
// mode to OVERWRITE will empty out this stream in the destination. | |
// Note: streams in streamsToReset that are NOT in this configured catalog (i.e. deleted streams) | |
// will still have their state reset by the Reset Source, but will not be modified in the | |
// destination since they are not present in the catalog that is sent to the destination. | |
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE); | |
} else { | |
// Set streams that are not being reset to APPEND so that they are not modified in the destination | |
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND); |
What I'm proposing here is that when the state is NOT_SET
, we probably only want to set the streams in the diff result to OVERWRITE
, so that only those streams are cleared out in the destination
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If it is a FULL_REFRESH/OVERRIDE, the next sync will also erase all (it will be a full refresh). Either we do it in the reset or in the next sync (which is automatically schedule). So it shouldn't be an issue.
- If it is a full refresh append, we should use the diff.
- If it is a new connection we should also erase.
I wonder how many FULL_REFRESH APPEND are present. This will be tricky because the sync mode is per stream and not per connection...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benmoriceau I don't think the current sync mode configuration of the connection matters when we are performing a reset, since from the code I linked above you can see that we are choosing which sync mode to use for each stream in the catalog when performing a reset. So what I was saying was that in the case that we are updating a connection whose state is currently NOT_SET
, then we can just use the results of the diff to decide which streams should be reset, and the above code will set those to FULL_REFRESH/OVERWRITE
, and have the rest of the streams (the ones without any diff) be set to FULL_REFRESH/APPEND
. That way, only the streams in the diff will have their data cleared in the destination, and others will be unaffected.
And all of the above can be accomplished by just setting streamsToReset
to the results of the diff in the case that the current connection state is NOT_SET
cb7780b
to
b24ae1c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, a couple cosmetic comments.
Since we changed the scope, might be worth updating title/description for anyone that may read this later on. Also, we are no longer removing the flag, instead we add a new endpoint that doesn't use the withRefreshedCatalog.
@@ -1260,6 +1260,27 @@ paths: | |||
summary: Update a connection | |||
tags: | |||
- web_backend | |||
/v1/web_backend/connections/updateNew: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though this should be temporary, does it make sense to name it updatePerStream
rather than new
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok if we leave as is just to get this in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, forgot to tap the comment as optional
return buildWebBackendConnectionRead(connectionRead); | ||
} | ||
|
||
public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the withRefreshedCatalog
flag is still part of the input. Would it make sense to fail hard if it is set instead of ignoring it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we want to fail since extra parameters don't really hurt anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is probably fine for now, once we consolidate the endpoints, we should clean it up or at least mark it as obsolete. It will save someone some headaches trying to understand why we have the some useless options.
For example, I have been scratching my head over how we currently have three different option to pass a catalog and how it affects the response.
@@ -822,6 +822,11 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne | |||
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate)); | |||
} | |||
|
|||
// @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be commented out
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionId); | ||
final ConnectionStateType stateType = getStateType(connectionIdRequestBody); | ||
|
||
if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET || stateType == ConnectionStateType.GLOBAL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alovew since we are now adding this as a separate endpoint, I think it probably makes sense to just make the change now to remove NOT_SET
and GLOBAL
from this if statement, since that is what we will want in the final state
skipReset: | ||
type: boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you are adding this skipReset
property in this PR, but the new endpoint is not actually referencing this new property anywhere. You are planning to add that logic to reference this property in a follow-up PR, right? Would it make sense to just wait until that PR to add this? Not a huge deal either way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes was planning to do this in a new PR
Instead of relying on a flag from the front end, the backend will now decide whether a connection needs a reset or not, so this PR removes the use of the withRefreshedCatalog param in the updateConnection endpoint.
The endpoint: