Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove withRefreshedCatalog param from updateConnection endpoint #14477

Merged
merged 18 commits into from
Jul 12, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -989,4 +990,10 @@ public List<StreamDescriptor> getAllStreamsForConnection(final UUID connectionId
return CatalogHelpers.extractStreamDescriptors(standardSync.getCatalog());
}

public ConfiguredAirbyteCatalog getConfiguredCatalogForConnection(final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync standardSync = getStandardSync(connectionId);
return standardSync.getCatalog();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,11 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
}

// @Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be commented out

public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate) {
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnectionNew(webBackendConnectionUpdate));
}

@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.m
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}

public static io.airbyte.protocol.models.StreamDescriptor streamDescriptorToProtocol(final StreamDescriptor apiStreamDescriptor) {
return new io.airbyte.protocol.models.StreamDescriptor().withName(apiStreamDescriptor.getName())
.withNamespace(apiStreamDescriptor.getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public ConnectionRead getConnection(final UUID connectionId)
return buildConnectionRead(connectionId);
}

public static CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
return new CatalogDiff().transforms(CatalogHelpers.getCatalogDiff(
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(oldCatalog)),
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(newCatalog)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.WebBackendConnectionCreate;
import io.airbyte.api.model.generated.WebBackendConnectionRead;
import io.airbyte.api.model.generated.WebBackendConnectionReadList;
Expand All @@ -50,7 +52,11 @@
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.server.converters.ProtocolConverters;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.io.IOException;
Expand Down Expand Up @@ -252,7 +258,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
* but was present at time of configuration will appear in the diff as an added stream which is
* confusing. We need to figure out why source_catalog_id is not always populated in the db.
*/
diff = ConnectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog());
diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog());
} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// reconstructs a full picture of the full schema at the time the catalog was configured.
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
Expand Down Expand Up @@ -371,6 +377,45 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
verifyManualOperationResult(manualOperationResult);
connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId());
}
return buildWebBackendConnectionRead(connectionRead);
}

public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the withRefreshedCatalog flag is still part of the input. Would it make sense to fail hard if it is set instead of ignoring it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to fail since extra parameters don't really hurt anything

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably fine for now, once we consolidate the endpoints, we should clean it up or at least mark it as obsolete. It will save someone some headaches trying to understand why we have the some useless options.
For example, I have been scratching my head over how we currently have three different option to pass a catalog and how it affects the response.

throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = updateOperations(webBackendConnectionUpdate);
final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds);

final UUID connectionId = webBackendConnectionUpdate.getConnectionId();

final ConfiguredAirbyteCatalog existingConfiguredCatalog =
configRepository.getConfiguredCatalogForConnection(connectionId);
final io.airbyte.protocol.models.AirbyteCatalog existingCatalog = CatalogHelpers.configuredCatalogToCatalog(existingConfiguredCatalog);
final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(existingCatalog);
final AirbyteCatalog newAirbyteCatalog = webBackendConnectionUpdate.getSyncCatalog();
final CatalogDiff catalogDiff = connectionsHandler.getDiff(apiExistingCatalog, newAirbyteCatalog);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just reading through the connectionsHandler.getDiff() implementation, and it looks like it the first thing it does is convert the "configured catalogs" to just normal "catalogs", which don't contain any of the configured options like the destination sync mode the user wants to use (overwrite/append), or the cursor field that they have selected to use for incremental syncs.

I think this is a problem, because it means that if, for example, a user just changes the cursor field of their stream, we will not perform a reset of that stream because that configuration is lost when we convert from configured catalog to catalog. This could lead to strange issues like the source trying to use the cursor value of one column as the cursor of another, and could be very tough to fix.

We probably want to keep the current diff logic for returning the diff to the frontend for showing the changes when a user refreshes source schema, but I think the above issue means we need to have a second diff calculation method that compares the configured fields of the streams as well, so that we also perform a reset in cases where only the configuration of a stream is changed.

FYI @benmoriceau and @cgardens as the original implementer of the diff logic, does the above sound correct to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior is to propose to perform a reset if the cursor change (probably also true i). Should we also keep it as optional as well? When a sync mode is changed (e.g.: from full refresh to incremental) the cursor will also change but we might not want to perform a reset in this case. We can calculate the diff here but I am not sure about the expected behavior regarding the reset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is address in #14626


final List<StreamDescriptor> apiStreamsToReset = getStreamsToReset(catalogDiff);
List<io.airbyte.protocol.models.StreamDescriptor> streamsToReset =
apiStreamsToReset.stream().map(ProtocolConverters::streamDescriptorToProtocol).toList();

ConnectionRead connectionRead;
connectionRead = connectionsHandler.updateConnection(connectionUpdate);

if (!streamsToReset.isEmpty()) {
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionStateType stateType = getStateType(connectionIdRequestBody);

if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET || stateType == ConnectionStateType.GLOBAL) {
Copy link
Contributor

@lmossman lmossman Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan was to change this to remove the || stateType == ConnectionStateType.GLOBAL once the postgres source is updated with the logic for handling resets of individual streams in the CDC case (I think this PR), right?

I am concerned this could lead to some issues, such as the following scenario:

  • A user updates their platform to have the change to remove || stateType == ConnectionStateType.GLOBAL from this line of code
  • This user is still using the version of the postgres source that emits GLOBAL state for CDC, but does not have that individual stream reset change (i.e. the current version of the postgres source 0.4.30)
  • The user then performs an action that results in a partial reset of the connection

In this case, since the postgres source doesn't have that new logic, it could lead to bad behavior, e.g. the source just continuing to sync all streams from the current cdc cursor, or not syncing the streams that were reset at all.

@subodh1810 @benmoriceau @gosusnp FYI, I'm not super sure how we should address this issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This user is still using the version of the postgres source that emits GLOBAL state for CDC, but does not have that individual stream reset change (i.e. the current version of the postgres source 0.4.30)

@lmossman can we force them to migrate to the right postgres connector with the seed file? If they don't downgrade it should ensure that we will use the right version. There might still be an issue with companies that are using the public connectors as a custom one. I think that datadog is doing that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good point, I had thought that we had logic to prevent updating a connector if it was actively being used in a deployment, but it looks like we actually do update connectors even if they are in use

if (connectorRepositoriesInUse.contains(repository)) {
final String latestImageTag = latestDefinition.get("dockerImageTag").asText();
if (hasNewPatchVersion(connectorInfo.dockerImageTag, latestImageTag)) {
// Update connector to the latest patch version
LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag);
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition);

For the custom connector case, we may just have to try to communicate that those will have to be rebuilt on the latest version

Copy link
Contributor

@lmossman lmossman Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, one more thing I think we discussed is that we probably want to allow the diff result streamsToReset to be used when the state type is NOT_SET, rather than just resetting all streams for the connection.

The state type will be NOT_SET if the connection is a non-incremental one or if nothing has been synced yet. Whether we reset all streams for the connection or just streamsToReset doesn't matter for the resulting state (EmptyAirbyteSource will just emit a null state in either case, since there is not current state - code here). But using streamsToReset vs getAllStreamsForConnection does affect what will happen to the data in the destination. And it seems useful to only clear out the data for the streams that are being updated/added, rather than clearing out all data.

@benmoriceau or @andyjih do you agree with the above?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If streamsToReset only includes the streams that were updated/added, then yes we should only update the relevant streams and not all streams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamsToReset will include all streams that are updated, added, or deleted. But we will only end up clearing out the data for streams that are updated or added in the destination, because those are the only ones that are still in the catalog after it is updated in this endpoint. Streams that aren't in the catalog are not modified in the destination, so deleted streams won't have their destination data affected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be handled correctly by the empty airbyte source reset whether we pass in streamsToReset or all streams in a connection?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EmptyAirbyteSource will act the same way in either case - it will output no state, since none exists; the difference is in this logic where we decide whether to set the destination sync mode of a stream in the catalog to OVERWRITE or APPEND based on whether the stream is present in streamsToReset:

if (streamsToReset.contains(streamDescriptor)) {
// The Reset Source will emit no record messages for any streams, so setting the destination sync
// mode to OVERWRITE will empty out this stream in the destination.
// Note: streams in streamsToReset that are NOT in this configured catalog (i.e. deleted streams)
// will still have their state reset by the Reset Source, but will not be modified in the
// destination since they are not present in the catalog that is sent to the destination.
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
} else {
// Set streams that are not being reset to APPEND so that they are not modified in the destination
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND);

What I'm proposing here is that when the state is NOT_SET, we probably only want to set the streams in the diff result to OVERWRITE, so that only those streams are cleared out in the destination

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If it is a FULL_REFRESH/OVERRIDE, the next sync will also erase all (it will be a full refresh). Either we do it in the reset or in the next sync (which is automatically schedule). So it shouldn't be an issue.
  • If it is a full refresh append, we should use the diff.
  • If it is a new connection we should also erase.

I wonder how many FULL_REFRESH APPEND are present. This will be tricky because the sync mode is per stream and not per connection...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau I don't think the current sync mode configuration of the connection matters when we are performing a reset, since from the code I linked above you can see that we are choosing which sync mode to use for each stream in the catalog when performing a reset. So what I was saying was that in the case that we are updating a connection whose state is currently NOT_SET, then we can just use the results of the diff to decide which streams should be reset, and the above code will set those to FULL_REFRESH/OVERWRITE, and have the rest of the streams (the ones without any diff) be set to FULL_REFRESH/APPEND. That way, only the streams in the diff will have their data cleared in the destination, and others will be unaffected.

And all of the above can be accomplished by just setting streamsToReset to the results of the diff in the case that the current connection state is NOT_SET

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alovew since we are now adding this as a separate endpoint, I think it probably makes sense to just make the change now to remove NOT_SET and GLOBAL from this if statement, since that is what we will want in the final state

streamsToReset = configRepository.getAllStreamsForConnection(connectionId);
}
ManualOperationResult manualOperationResult = eventRunner.synchronousResetConnection(
webBackendConnectionUpdate.getConnectionId(),
streamsToReset);
verifyManualOperationResult(manualOperationResult);
manualOperationResult = eventRunner.startNewManualSync(webBackendConnectionUpdate.getConnectionId());
verifyManualOperationResult(manualOperationResult);
connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId());
}

return buildWebBackendConnectionRead(connectionRead);
}
Expand Down Expand Up @@ -488,6 +533,11 @@ protected static ConnectionSearch toConnectionSearch(final WebBackendConnectionS
.status(webBackendConnectionSearch.getStatus());
}

@VisibleForTesting
static List<StreamDescriptor> getStreamsToReset(final CatalogDiff catalogDiff) {
return catalogDiff.getTransforms().stream().map(StreamTransform::getStreamDescriptor).toList();
}

/**
* Equivalent to {@see io.airbyte.integrations.base.AirbyteStreamNameNamespacePair}. Intentionally
* not using that class because it doesn't make sense for airbyte-server to depend on
Expand Down
Loading