Skip to content

Commit

Permalink
Remove withRefreshedCatalog param from updateConnection endpoint (#14477
Browse files Browse the repository at this point in the history
)

* New webBackendConnectionUpdate endpoint with withRefreshedCatalog param removed
  • Loading branch information
alovew authored Jul 12, 2022
1 parent c87068b commit 767992e
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -989,4 +990,10 @@ public List<StreamDescriptor> getAllStreamsForConnection(final UUID connectionId
return CatalogHelpers.extractStreamDescriptors(standardSync.getCatalog());
}

public ConfiguredAirbyteCatalog getConfiguredCatalogForConnection(final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync standardSync = getStandardSync(connectionId);
return standardSync.getCatalog();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,11 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
}

// @Override
public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate) {
return execute(() -> webBackendConnectionsHandler.webBackendUpdateConnectionNew(webBackendConnectionUpdate));
}

@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.m
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}

public static io.airbyte.protocol.models.StreamDescriptor streamDescriptorToProtocol(final StreamDescriptor apiStreamDescriptor) {
return new io.airbyte.protocol.models.StreamDescriptor().withName(apiStreamDescriptor.getName())
.withNamespace(apiStreamDescriptor.getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public ConnectionRead getConnection(final UUID connectionId)
return buildConnectionRead(connectionId);
}

public static CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
return new CatalogDiff().transforms(CatalogHelpers.getCatalogDiff(
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(oldCatalog)),
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(newCatalog)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.WebBackendConnectionCreate;
import io.airbyte.api.model.generated.WebBackendConnectionRead;
import io.airbyte.api.model.generated.WebBackendConnectionReadList;
Expand All @@ -50,7 +52,11 @@
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.server.converters.ProtocolConverters;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.io.IOException;
Expand Down Expand Up @@ -252,7 +258,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
* but was present at time of configuration will appear in the diff as an added stream which is
* confusing. We need to figure out why source_catalog_id is not always populated in the db.
*/
diff = ConnectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog());
diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog());
} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// reconstructs a full picture of the full schema at the time the catalog was configured.
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
Expand Down Expand Up @@ -371,6 +377,45 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
verifyManualOperationResult(manualOperationResult);
connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId());
}
return buildWebBackendConnectionRead(connectionRead);
}

public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendConnectionUpdate webBackendConnectionUpdate)
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = updateOperations(webBackendConnectionUpdate);
final ConnectionUpdate connectionUpdate = toConnectionUpdate(webBackendConnectionUpdate, operationIds);

final UUID connectionId = webBackendConnectionUpdate.getConnectionId();

final ConfiguredAirbyteCatalog existingConfiguredCatalog =
configRepository.getConfiguredCatalogForConnection(connectionId);
final io.airbyte.protocol.models.AirbyteCatalog existingCatalog = CatalogHelpers.configuredCatalogToCatalog(existingConfiguredCatalog);
final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(existingCatalog);
final AirbyteCatalog newAirbyteCatalog = webBackendConnectionUpdate.getSyncCatalog();
final CatalogDiff catalogDiff = connectionsHandler.getDiff(apiExistingCatalog, newAirbyteCatalog);

final List<StreamDescriptor> apiStreamsToReset = getStreamsToReset(catalogDiff);
List<io.airbyte.protocol.models.StreamDescriptor> streamsToReset =
apiStreamsToReset.stream().map(ProtocolConverters::streamDescriptorToProtocol).toList();

ConnectionRead connectionRead;
connectionRead = connectionsHandler.updateConnection(connectionUpdate);

if (!streamsToReset.isEmpty()) {
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionStateType stateType = getStateType(connectionIdRequestBody);

if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET || stateType == ConnectionStateType.GLOBAL) {
streamsToReset = configRepository.getAllStreamsForConnection(connectionId);
}
ManualOperationResult manualOperationResult = eventRunner.synchronousResetConnection(
webBackendConnectionUpdate.getConnectionId(),
streamsToReset);
verifyManualOperationResult(manualOperationResult);
manualOperationResult = eventRunner.startNewManualSync(webBackendConnectionUpdate.getConnectionId());
verifyManualOperationResult(manualOperationResult);
connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId());
}

return buildWebBackendConnectionRead(connectionRead);
}
Expand Down Expand Up @@ -488,6 +533,11 @@ protected static ConnectionSearch toConnectionSearch(final WebBackendConnectionS
.status(webBackendConnectionSearch.getStatus());
}

@VisibleForTesting
static List<StreamDescriptor> getStreamsToReset(final CatalogDiff catalogDiff) {
return catalogDiff.getTransforms().stream().map(StreamTransform::getStreamDescriptor).toList();
}

/**
* Equivalent to {@see io.airbyte.integrations.base.AirbyteStreamNameNamespacePair}. Intentionally
* not using that class because it doesn't make sense for airbyte-server to depend on
Expand Down
Loading

0 comments on commit 767992e

Please sign in to comment.