Skip to content

Commit

Permalink
add catalog diff connection read (#13918)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Jun 22, 2022
1 parent 48baf99 commit bbb340f
Show file tree
Hide file tree
Showing 13 changed files with 405 additions and 582 deletions.
3 changes: 3 additions & 0 deletions airbyte-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ task generateApiServer(type: GenerateTask) {
'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode',
'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode',
'StateBlob' : 'com.fasterxml.jackson.databind.JsonNode',
'FieldSchema' : 'com.fasterxml.jackson.databind.JsonNode',
]

generateApiDocumentation = false
Expand Down Expand Up @@ -71,6 +72,7 @@ task generateApiClient(type: GenerateTask) {
'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode',
'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode',
'StateBlob' : 'com.fasterxml.jackson.databind.JsonNode',
'FieldSchema' : 'com.fasterxml.jackson.databind.JsonNode',
]

library = "native"
Expand Down Expand Up @@ -104,6 +106,7 @@ task generateApiDocs(type: GenerateTask) {
'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode',
'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode',
'StateBlob' : 'com.fasterxml.jackson.databind.JsonNode',
'FieldSchema' : 'com.fasterxml.jackson.databind.JsonNode',
]

generateApiDocumentation = false
Expand Down
1 change: 0 additions & 1 deletion airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4140,7 +4140,6 @@ components:
FieldSchema:
description: JSONSchema representation of the field
type: object
additionalProperties: true
ActorDefinitionResourceRequirements:
description: actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level.
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,64 @@ public static ConfiguredAirbyteStream createConfiguredAirbyteStream(final String
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE);
}

/**
* Converts a {@link ConfiguredAirbyteCatalog} into an {@link AirbyteCatalog}. This is possible
* because the latter is a subset of the former.
*
* @param configuredCatalog - catalog to convert
* @return - airbyte catalog
*/
public static AirbyteCatalog configuredCatalogToCatalog(final ConfiguredAirbyteCatalog configuredCatalog) {
return new AirbyteCatalog().withStreams(
configuredCatalog.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.collect(Collectors.toList()));
}

/**
* Extracts {@link StreamDescriptor} for a given {@link AirbyteStream}
*
* @param airbyteStream stream
* @return stream descriptor
*/
public static StreamDescriptor extractDescriptor(final ConfiguredAirbyteStream airbyteStream) {
return extractDescriptor(airbyteStream.getStream());
}

/**
* Extracts {@link StreamDescriptor} for a given {@link ConfiguredAirbyteStream}
*
* @param airbyteStream stream
* @return stream descriptor
*/
public static StreamDescriptor extractDescriptor(final AirbyteStream airbyteStream) {
return new StreamDescriptor().withName(airbyteStream.getName()).withNamespace(airbyteStream.getNamespace());
}

/**
* Extracts {@link StreamDescriptor}s for each stream in a given {@link ConfiguredAirbyteCatalog}
*
* @param configuredCatalog catalog
* @return list of stream descriptors
*/
public static List<StreamDescriptor> extractStreamDescriptors(final ConfiguredAirbyteCatalog configuredCatalog) {
return extractStreamDescriptors(configuredCatalogToCatalog(configuredCatalog));
}

/**
* Extracts {@link StreamDescriptor}s for each stream in a given {@link AirbyteCatalog}
*
* @param catalog catalog
* @return list of stream descriptors
*/
public static List<StreamDescriptor> extractStreamDescriptors(final AirbyteCatalog catalog) {
return catalog.getStreams()
.stream()
.map(abStream -> new StreamDescriptor().withName(abStream.getName()).withNamespace(abStream.getNamespace()))
.toList();
}

/**
* Convert a Catalog into a ConfiguredCatalog. This applies minimum default to the Catalog to make
* it a valid ConfiguredCatalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,20 @@ public static FieldTransform createUpdateFieldTransform(final UpdateFieldTransfo
return new FieldTransform(FieldTransformType.UPDATE_FIELD, null, null, updateFieldTransform);
}

public FieldTransformType getTransformType() {
return transformType;
}

public AddFieldTransform getAddFieldTransform() {
return addFieldTransform;
}

public RemoveFieldTransform getRemoveFieldTransform() {
return removeFieldTransform;
}

public UpdateFieldTransform getUpdateFieldTransform() {
return updateFieldTransform;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ public class RemoveStreamTransform {

private final StreamDescriptor streamDescriptor;

public StreamDescriptor getStreamDescriptor() {
return streamDescriptor;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.converters;

import io.airbyte.api.model.generated.FieldNameAndSchema;
import io.airbyte.api.model.generated.FieldSchemaUpdate;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.commons.enums.Enums;
import io.airbyte.protocol.models.transform_models.FieldTransformType;
import io.airbyte.protocol.models.transform_models.StreamTransformType;
import java.util.List;
import java.util.Optional;

/**
* Utility methods for converting between internal and API representation of catalog diffs.
*/
public class CatalogDiffConverters {

public static StreamTransform streamTransformToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) {
return new StreamTransform()
.transformType(Enums.convertTo(transform.getTransformType(), StreamTransform.TransformTypeEnum.class))
.addStream(addStreamToApi(transform).orElse(null))
.removeStream(removeStreamToApi(transform).orElse(null))
.updateStream(updateStreamToApi(transform).orElse(null));
}

public static Optional<StreamDescriptor> addStreamToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) {
if (transform.getTransformType() == StreamTransformType.ADD_STREAM) {
return Optional.ofNullable(ProtocolConverters.streamDescriptorToApi(transform.getAddStreamTransform().getStreamDescriptor()));
} else {
return Optional.empty();
}
}

public static Optional<StreamDescriptor> removeStreamToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) {
if (transform.getTransformType() == StreamTransformType.REMOVE_STREAM) {
return Optional.ofNullable(ProtocolConverters.streamDescriptorToApi(transform.getRemoveStreamTransform().getStreamDescriptor()));
} else {
return Optional.empty();
}
}

public static Optional<List<FieldTransform>> updateStreamToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) {
if (transform.getTransformType() == StreamTransformType.UPDATE_STREAM) {
return Optional.ofNullable(transform.getUpdateStreamTransform()
.getFieldTransforms()
.stream()
.map(CatalogDiffConverters::fieldTransformToApi)
.toList());
} else {
return Optional.empty();
}
}

public static FieldTransform fieldTransformToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
return new FieldTransform()
.transformType(Enums.convertTo(transform.getTransformType(), FieldTransform.TransformTypeEnum.class))
.addField(addFieldToApi(transform).orElse(null))
.removeField(removeFieldToApi(transform).orElse(null))
.updateFieldSchema(updateFieldToApi(transform).orElse(null));
}

private static Optional<FieldNameAndSchema> addFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
if (transform.getTransformType() == FieldTransformType.ADD_FIELD) {
return Optional.of(new FieldNameAndSchema()
.fieldName(transform.getAddFieldTransform().getFieldName())
.fieldSchema(transform.getAddFieldTransform().getSchema()));
} else {
return Optional.empty();
}
}

private static Optional<FieldNameAndSchema> removeFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
if (transform.getTransformType() == FieldTransformType.REMOVE_FIELD) {
return Optional.of(new FieldNameAndSchema()
.fieldName(transform.getRemoveFieldTransform().getFieldName())
.fieldSchema(transform.getRemoveFieldTransform().getSchema()));
} else {
return Optional.empty();
}
}

private static Optional<FieldSchemaUpdate> updateFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) {
if (transform.getTransformType() == FieldTransformType.UPDATE_FIELD) {
return Optional.of(new FieldSchemaUpdate()
.fieldName(transform.getUpdateFieldTransform().getFieldName())
.oldSchema(transform.getUpdateFieldTransform().getOldSchema())
.newSchema(transform.getUpdateFieldTransform().getNewSchema()));
} else {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.converters;

import io.airbyte.api.model.generated.StreamDescriptor;

/**
* Utilities that convert protocol types into API representations of the protocol type.
*/
public class ProtocolConverters {

public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.models.StreamDescriptor protocolStreamDescriptor) {
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.collect.Lists;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.api.model.generated.AirbyteCatalog;
import io.airbyte.api.model.generated.CatalogDiff;
import io.airbyte.api.model.generated.ConnectionCreate;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionReadList;
Expand All @@ -33,10 +34,12 @@
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.ApiPojoConverters;
import io.airbyte.server.converters.CatalogDiffConverters;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.server.handlers.helpers.ConnectionMatcher;
import io.airbyte.server.handlers.helpers.DestinationMatcher;
Expand Down Expand Up @@ -256,6 +259,15 @@ public ConnectionRead getConnection(final UUID connectionId)
return buildConnectionRead(connectionId);
}

public static CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
return new CatalogDiff().transforms(CatalogHelpers.getCatalogDiff(
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(oldCatalog)),
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(newCatalog)))
.stream()
.map(CatalogDiffConverters::streamTransformToApi)
.toList());
}

public Optional<AirbyteCatalog> getConnectionAirbyteCatalog(final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync connection = configRepository.getStandardSync(connectionId);
Expand Down Expand Up @@ -303,15 +315,15 @@ public boolean matchSearch(final ConnectionSearch connectionSearch, final Connec
matchSearch(connectionSearch.getDestination(), destinationRead);
}

// todo (cgardens) - make this static. requires removing one bad dependence in SourceHandlerTest
// todo (cgardens) - make this static. requires removing one bad dependency in SourceHandlerTest
public boolean matchSearch(final SourceSearch sourceSearch, final SourceRead sourceRead) {
final SourceMatcher sourceMatcher = new SourceMatcher(sourceSearch);
final SourceRead sourceReadFromSearch = sourceMatcher.match(sourceRead);

return (sourceReadFromSearch == null || sourceReadFromSearch.equals(sourceRead));
}

// todo (cgardens) - make this static. requires removing one bad dependence in
// todo (cgardens) - make this static. requires removing one bad dependency in
// DestinationHandlerTest
public boolean matchSearch(final DestinationSearch destinationSearch, final DestinationRead destinationRead) {
final DestinationMatcher destinationMatcher = new DestinationMatcher(destinationSearch);
Expand Down
Loading

0 comments on commit bbb340f

Please sign in to comment.