diff --git a/airbyte-api/build.gradle b/airbyte-api/build.gradle index 3ab11ed2e37b..f9314d1b0c64 100644 --- a/airbyte-api/build.gradle +++ b/airbyte-api/build.gradle @@ -29,6 +29,7 @@ task generateApiServer(type: GenerateTask) { 'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode', 'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode', 'StateBlob' : 'com.fasterxml.jackson.databind.JsonNode', + 'FieldSchema' : 'com.fasterxml.jackson.databind.JsonNode', ] generateApiDocumentation = false @@ -71,6 +72,7 @@ task generateApiClient(type: GenerateTask) { 'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode', 'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode', 'StateBlob' : 'com.fasterxml.jackson.databind.JsonNode', + 'FieldSchema' : 'com.fasterxml.jackson.databind.JsonNode', ] library = "native" @@ -104,6 +106,7 @@ task generateApiDocs(type: GenerateTask) { 'DestinationConfiguration' : 'com.fasterxml.jackson.databind.JsonNode', 'StreamJsonSchema' : 'com.fasterxml.jackson.databind.JsonNode', 'StateBlob' : 'com.fasterxml.jackson.databind.JsonNode', + 'FieldSchema' : 'com.fasterxml.jackson.databind.JsonNode', ] generateApiDocumentation = false diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 53b09f2d76ea..7213ca4a161a 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -4140,7 +4140,6 @@ components: FieldSchema: description: JSONSchema representation of the field type: object - additionalProperties: true ActorDefinitionResourceRequirements: description: actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level. type: object diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index f3044709bc90..5ef450def3e0 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -68,6 +68,64 @@ public static ConfiguredAirbyteStream createConfiguredAirbyteStream(final String .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE); } + /** + * Converts a {@link ConfiguredAirbyteCatalog} into an {@link AirbyteCatalog}. This is possible + * because the latter is a subset of the former. + * + * @param configuredCatalog - catalog to convert + * @return - airbyte catalog + */ + public static AirbyteCatalog configuredCatalogToCatalog(final ConfiguredAirbyteCatalog configuredCatalog) { + return new AirbyteCatalog().withStreams( + configuredCatalog.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .collect(Collectors.toList())); + } + + /** + * Extracts {@link StreamDescriptor} for a given {@link AirbyteStream} + * + * @param airbyteStream stream + * @return stream descriptor + */ + public static StreamDescriptor extractDescriptor(final ConfiguredAirbyteStream airbyteStream) { + return extractDescriptor(airbyteStream.getStream()); + } + + /** + * Extracts {@link StreamDescriptor} for a given {@link ConfiguredAirbyteStream} + * + * @param airbyteStream stream + * @return stream descriptor + */ + public static StreamDescriptor extractDescriptor(final AirbyteStream airbyteStream) { + return new StreamDescriptor().withName(airbyteStream.getName()).withNamespace(airbyteStream.getNamespace()); + } + + /** + * Extracts {@link StreamDescriptor}s for each stream in a given {@link ConfiguredAirbyteCatalog} + * + * @param configuredCatalog catalog + * @return list of stream descriptors + */ + public static List extractStreamDescriptors(final ConfiguredAirbyteCatalog configuredCatalog) { + return extractStreamDescriptors(configuredCatalogToCatalog(configuredCatalog)); + } + + /** + * Extracts {@link StreamDescriptor}s for each stream in a given {@link AirbyteCatalog} + * + * @param catalog catalog + * @return list of stream descriptors + */ + public static List extractStreamDescriptors(final AirbyteCatalog catalog) { + return catalog.getStreams() + .stream() + .map(abStream -> new StreamDescriptor().withName(abStream.getName()).withNamespace(abStream.getNamespace())) + .toList(); + } + /** * Convert a Catalog into a ConfiguredCatalog. This applies minimum default to the Catalog to make * it a valid ConfiguredCatalog. diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransform.java index af5d9b48037d..485ef2b122e7 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransform.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransform.java @@ -43,4 +43,20 @@ public static FieldTransform createUpdateFieldTransform(final UpdateFieldTransfo return new FieldTransform(FieldTransformType.UPDATE_FIELD, null, null, updateFieldTransform); } + public FieldTransformType getTransformType() { + return transformType; + } + + public AddFieldTransform getAddFieldTransform() { + return addFieldTransform; + } + + public RemoveFieldTransform getRemoveFieldTransform() { + return removeFieldTransform; + } + + public UpdateFieldTransform getUpdateFieldTransform() { + return updateFieldTransform; + } + } diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveStreamTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveStreamTransform.java index a5839ab0568c..c2582f37b71f 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveStreamTransform.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveStreamTransform.java @@ -20,4 +20,8 @@ public class RemoveStreamTransform { private final StreamDescriptor streamDescriptor; + public StreamDescriptor getStreamDescriptor() { + return streamDescriptor; + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/CatalogDiffConverters.java b/airbyte-server/src/main/java/io/airbyte/server/converters/CatalogDiffConverters.java new file mode 100644 index 000000000000..5817d5012a0a --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/CatalogDiffConverters.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.converters; + +import io.airbyte.api.model.generated.FieldNameAndSchema; +import io.airbyte.api.model.generated.FieldSchemaUpdate; +import io.airbyte.api.model.generated.FieldTransform; +import io.airbyte.api.model.generated.StreamDescriptor; +import io.airbyte.api.model.generated.StreamTransform; +import io.airbyte.commons.enums.Enums; +import io.airbyte.protocol.models.transform_models.FieldTransformType; +import io.airbyte.protocol.models.transform_models.StreamTransformType; +import java.util.List; +import java.util.Optional; + +/** + * Utility methods for converting between internal and API representation of catalog diffs. + */ +public class CatalogDiffConverters { + + public static StreamTransform streamTransformToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) { + return new StreamTransform() + .transformType(Enums.convertTo(transform.getTransformType(), StreamTransform.TransformTypeEnum.class)) + .addStream(addStreamToApi(transform).orElse(null)) + .removeStream(removeStreamToApi(transform).orElse(null)) + .updateStream(updateStreamToApi(transform).orElse(null)); + } + + public static Optional addStreamToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) { + if (transform.getTransformType() == StreamTransformType.ADD_STREAM) { + return Optional.ofNullable(ProtocolConverters.streamDescriptorToApi(transform.getAddStreamTransform().getStreamDescriptor())); + } else { + return Optional.empty(); + } + } + + public static Optional removeStreamToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) { + if (transform.getTransformType() == StreamTransformType.REMOVE_STREAM) { + return Optional.ofNullable(ProtocolConverters.streamDescriptorToApi(transform.getRemoveStreamTransform().getStreamDescriptor())); + } else { + return Optional.empty(); + } + } + + public static Optional> updateStreamToApi(final io.airbyte.protocol.models.transform_models.StreamTransform transform) { + if (transform.getTransformType() == StreamTransformType.UPDATE_STREAM) { + return Optional.ofNullable(transform.getUpdateStreamTransform() + .getFieldTransforms() + .stream() + .map(CatalogDiffConverters::fieldTransformToApi) + .toList()); + } else { + return Optional.empty(); + } + } + + public static FieldTransform fieldTransformToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) { + return new FieldTransform() + .transformType(Enums.convertTo(transform.getTransformType(), FieldTransform.TransformTypeEnum.class)) + .addField(addFieldToApi(transform).orElse(null)) + .removeField(removeFieldToApi(transform).orElse(null)) + .updateFieldSchema(updateFieldToApi(transform).orElse(null)); + } + + private static Optional addFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) { + if (transform.getTransformType() == FieldTransformType.ADD_FIELD) { + return Optional.of(new FieldNameAndSchema() + .fieldName(transform.getAddFieldTransform().getFieldName()) + .fieldSchema(transform.getAddFieldTransform().getSchema())); + } else { + return Optional.empty(); + } + } + + private static Optional removeFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) { + if (transform.getTransformType() == FieldTransformType.REMOVE_FIELD) { + return Optional.of(new FieldNameAndSchema() + .fieldName(transform.getRemoveFieldTransform().getFieldName()) + .fieldSchema(transform.getRemoveFieldTransform().getSchema())); + } else { + return Optional.empty(); + } + } + + private static Optional updateFieldToApi(final io.airbyte.protocol.models.transform_models.FieldTransform transform) { + if (transform.getTransformType() == FieldTransformType.UPDATE_FIELD) { + return Optional.of(new FieldSchemaUpdate() + .fieldName(transform.getUpdateFieldTransform().getFieldName()) + .oldSchema(transform.getUpdateFieldTransform().getOldSchema()) + .newSchema(transform.getUpdateFieldTransform().getNewSchema())); + } else { + return Optional.empty(); + } + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/ProtocolConverters.java b/airbyte-server/src/main/java/io/airbyte/server/converters/ProtocolConverters.java new file mode 100644 index 000000000000..b71771e76da9 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/ProtocolConverters.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.converters; + +import io.airbyte.api.model.generated.StreamDescriptor; + +/** + * Utilities that convert protocol types into API representations of the protocol type. + */ +public class ProtocolConverters { + + public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.models.StreamDescriptor protocolStreamDescriptor) { + return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace()); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index 5e46b7cce9eb..2f58d4f66646 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -10,6 +10,7 @@ import com.google.common.collect.Lists; import io.airbyte.analytics.TrackingClient; import io.airbyte.api.model.generated.AirbyteCatalog; +import io.airbyte.api.model.generated.CatalogDiff; import io.airbyte.api.model.generated.ConnectionCreate; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.ConnectionReadList; @@ -33,10 +34,12 @@ import io.airbyte.config.helpers.ScheduleHelpers; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.persistence.WorkspaceHelper; import io.airbyte.server.converters.ApiPojoConverters; +import io.airbyte.server.converters.CatalogDiffConverters; import io.airbyte.server.handlers.helpers.CatalogConverter; import io.airbyte.server.handlers.helpers.ConnectionMatcher; import io.airbyte.server.handlers.helpers.DestinationMatcher; @@ -256,6 +259,15 @@ public ConnectionRead getConnection(final UUID connectionId) return buildConnectionRead(connectionId); } + public static CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) { + return new CatalogDiff().transforms(CatalogHelpers.getCatalogDiff( + CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(oldCatalog)), + CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(newCatalog))) + .stream() + .map(CatalogDiffConverters::streamTransformToApi) + .toList()); + } + public Optional getConnectionAirbyteCatalog(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException { final StandardSync connection = configRepository.getStandardSync(connectionId); @@ -303,7 +315,7 @@ public boolean matchSearch(final ConnectionSearch connectionSearch, final Connec matchSearch(connectionSearch.getDestination(), destinationRead); } - // todo (cgardens) - make this static. requires removing one bad dependence in SourceHandlerTest + // todo (cgardens) - make this static. requires removing one bad dependency in SourceHandlerTest public boolean matchSearch(final SourceSearch sourceSearch, final SourceRead sourceRead) { final SourceMatcher sourceMatcher = new SourceMatcher(sourceSearch); final SourceRead sourceReadFromSearch = sourceMatcher.match(sourceRead); @@ -311,7 +323,7 @@ public boolean matchSearch(final SourceSearch sourceSearch, final SourceRead sou return (sourceReadFromSearch == null || sourceReadFromSearch.equals(sourceRead)); } - // todo (cgardens) - make this static. requires removing one bad dependence in + // todo (cgardens) - make this static. requires removing one bad dependency in // DestinationHandlerTest public boolean matchSearch(final DestinationSearch destinationSearch, final DestinationRead destinationRead) { final DestinationMatcher destinationMatcher = new DestinationMatcher(destinationSearch); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index c947bde6997b..8d0f27885842 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -13,6 +13,7 @@ import io.airbyte.api.model.generated.AirbyteStream; import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration; import io.airbyte.api.model.generated.AirbyteStreamConfiguration; +import io.airbyte.api.model.generated.CatalogDiff; import io.airbyte.api.model.generated.ConnectionCreate; import io.airbyte.api.model.generated.ConnectionIdRequestBody; import io.airbyte.api.model.generated.ConnectionRead; @@ -57,7 +58,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.function.Predicate; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -74,6 +74,7 @@ public class WebBackendConnectionsHandler { private final SchedulerHandler schedulerHandler; private final OperationsHandler operationsHandler; private final EventRunner eventRunner; + // todo (cgardens) - this handler should NOT have access to the db. only access via handler. private final ConfigRepository configRepository; public WebBackendWorkspaceStateResult getWorkspaceState(final WebBackendWorkspaceState webBackendWorkspaceState) throws IOException { @@ -113,14 +114,20 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR final SourceRead source = getSourceRead(connectionRead); final DestinationRead destination = getDestinationRead(connectionRead); final OperationReadList operations = getOperationReadList(connectionRead); - final WebBackendConnectionRead WebBackendConnectionRead = getWebBackendConnectionRead(connectionRead, source, destination, operations); - final JobReadList syncJobReadList = getSyncJobs(connectionRead); - final Predicate hasRunningJob = (JobRead job) -> !TERMINAL_STATUSES.contains(job.getStatus()); - WebBackendConnectionRead.setIsSyncing(syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).anyMatch(hasRunningJob)); - setLatestSyncJobProperties(WebBackendConnectionRead, syncJobReadList); - WebBackendConnectionRead.setCatalogId(connectionRead.getSourceCatalogId()); - return WebBackendConnectionRead; + + final WebBackendConnectionRead webBackendConnectionRead = getWebBackendConnectionRead(connectionRead, source, destination, operations) + .catalogId(connectionRead.getSourceCatalogId()) + .isSyncing(syncJobReadList.getJobs() + .stream() + .map(JobWithAttemptsRead::getJob) + .anyMatch(WebBackendConnectionsHandler::isRunningJob)); + setLatestSyncJobProperties(webBackendConnectionRead, syncJobReadList); + return webBackendConnectionRead; + } + + private static boolean isRunningJob(final JobRead job) { + return !TERMINAL_STATUSES.contains(job.getStatus()); } private SourceRead getSourceRead(final ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException { @@ -140,10 +147,10 @@ private OperationReadList getOperationReadList(final ConnectionRead connectionRe return operationsHandler.listOperationsForConnection(connectionIdRequestBody); } - private WebBackendConnectionRead getWebBackendConnectionRead(final ConnectionRead connectionRead, - final SourceRead source, - final DestinationRead destination, - final OperationReadList operations) { + private static WebBackendConnectionRead getWebBackendConnectionRead(final ConnectionRead connectionRead, + final SourceRead source, + final DestinationRead destination, + final OperationReadList operations) { return new WebBackendConnectionRead() .connectionId(connectionRead.getConnectionId()) .sourceId(connectionRead.getSourceId()) @@ -169,7 +176,7 @@ private JobReadList getSyncJobs(final ConnectionRead connectionRead) throws IOEx return jobHistoryHandler.listJobsFor(jobListRequestBody); } - private void setLatestSyncJobProperties(final WebBackendConnectionRead WebBackendConnectionRead, final JobReadList syncJobReadList) { + private static void setLatestSyncJobProperties(final WebBackendConnectionRead WebBackendConnectionRead, final JobReadList syncJobReadList) { syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).findFirst() .ifPresent(job -> { WebBackendConnectionRead.setLatestSyncJobCreatedAt(job.getCreatedAt()); @@ -199,8 +206,9 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti final Optional discovered; if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) { - final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = - new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId()).disableCache(true); + final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody() + .sourceId(connection.getSourceId()) + .disableCache(true); final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq); discovered = Optional.of(discoverSchema.getCatalog()); @@ -209,13 +217,17 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti discovered = connectionsHandler.getConnectionAirbyteCatalog(webBackendConnectionRequestBody.getConnectionId()); } final AirbyteCatalog original = connection.getSyncCatalog(); + final CatalogDiff diff; if (discovered.isPresent()) { final AirbyteCatalog combined = updateSchemaWithDiscovery(original, discovered.get()); connection.setSyncCatalog(combined); + diff = ConnectionsHandler.getDiff(original, discovered.get()); } else { connection.setSyncCatalog(original); + diff = null; } - return buildWebBackendConnectionRead(connection); + + return buildWebBackendConnectionRead(connection).catalogDiff(diff); } @VisibleForTesting @@ -328,10 +340,6 @@ private List updateOperations(final WebBackendConnectionUpdate webBackendC return operationIds; } - private UUID getWorkspaceIdForSource(final UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException { - return sourceHandler.getSource(new SourceIdRequestBody().sourceId(sourceId)).getWorkspaceId(); - } - @VisibleForTesting protected static OperationCreate toOperationCreate(final WebBackendOperationCreateOrUpdate operationCreateOrUpdate) { final OperationCreate operationCreate = new OperationCreate(); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java index e4584ed77ab5..ad2ae7e4e194 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java @@ -4,7 +4,10 @@ package io.airbyte.server.handlers.helpers; +import io.airbyte.api.model.generated.AirbyteCatalog; +import io.airbyte.api.model.generated.AirbyteStream; import io.airbyte.commons.enums.Enums; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.text.Names; import java.util.List; import java.util.stream.Collectors; @@ -83,6 +86,29 @@ public static io.airbyte.api.model.generated.AirbyteCatalog toApi(final io.airby return new io.airbyte.api.model.generated.AirbyteCatalog().streams(streams); } + /** + * Converts the API catalog model into a protocol catalog. Note: returns all streams, regardless of + * selected status. See {@link CatalogConverter#toProtocol(AirbyteStream)} for context. + * + * @param catalog api catalog + * @return protocol catalog + */ + public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocolKeepAllStreams(final io.airbyte.api.model.generated.AirbyteCatalog catalog) { + final AirbyteCatalog clone = Jsons.clone(catalog); + clone.getStreams().forEach(stream -> stream.getConfig().setSelected(true)); + return toProtocol(clone); + } + + /** + * Converts the API catalog model into a protocol catalog. Note: only streams marked as selected + * will be returned. This is included in this converter as the API model always carries all the + * streams it has access to and then marks the ones that should not be used as not selected, while + * the protocol version just uses the presence of the streams as evidence that it should be + * included. + * + * @param catalog api catalog + * @return protocol catalog + */ public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocol(final io.airbyte.api.model.generated.AirbyteCatalog catalog) { final List streams = catalog.getStreams() .stream() diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 17d3b167691a..ad078b9e80c5 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -21,6 +21,7 @@ import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration; import io.airbyte.api.model.generated.AttemptRead; import io.airbyte.api.model.generated.AttemptStatus; +import io.airbyte.api.model.generated.CatalogDiff; import io.airbyte.api.model.generated.ConnectionCreate; import io.airbyte.api.model.generated.ConnectionIdRequestBody; import io.airbyte.api.model.generated.ConnectionRead; @@ -49,6 +50,9 @@ import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.model.generated.SourceIdRequestBody; import io.airbyte.api.model.generated.SourceRead; +import io.airbyte.api.model.generated.StreamDescriptor; +import io.airbyte.api.model.generated.StreamTransform; +import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; import io.airbyte.api.model.generated.SyncMode; import io.airbyte.api.model.generated.SynchronousJobRead; import io.airbyte.api.model.generated.WebBackendConnectionCreate; @@ -78,7 +82,6 @@ import io.airbyte.server.helpers.SourceDefinitionHelpers; import io.airbyte.server.helpers.SourceHelpers; import io.airbyte.validation.json.JsonValidationException; -import io.airbyte.workers.helper.ConnectionHelper; import java.io.IOException; import java.lang.reflect.Method; import java.time.Instant; @@ -105,7 +108,6 @@ class WebBackendConnectionsHandlerTest { private WebBackendConnectionRead expected; private WebBackendConnectionRead expectedWithNewSchema; private EventRunner eventRunner; - private ConnectionHelper connectionHelper; private ConfigRepository configRepository; @BeforeEach @@ -118,7 +120,6 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE configRepository = mock(ConfigRepository.class); schedulerHandler = mock(SchedulerHandler.class); eventRunner = mock(EventRunner.class); - connectionHelper = mock(ConnectionHelper.class); wbHandler = new WebBackendConnectionsHandler(connectionsHandler, sourceHandler, destinationHandler, @@ -228,6 +229,10 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE .latestSyncJobCreatedAt(expected.getLatestSyncJobCreatedAt()) .latestSyncJobStatus(expected.getLatestSyncJobStatus()) .isSyncing(expected.getIsSyncing()) + .catalogDiff(new CatalogDiff().transforms(List.of( + new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM) + .addStream(new StreamDescriptor().name("users-data1")) + .updateStream(null)))) .resourceRequirements(new ResourceRequirements() .cpuRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuRequest()) .cpuLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuLimit()) @@ -350,7 +355,6 @@ public WebBackendConnectionRead testWebBackendGetConnection(final boolean withCa when(operationsHandler.listOperationsForConnection(connectionIdRequestBody)).thenReturn(operationReadList); return wbHandler.webBackendGetConnection(webBackendConnectionIdRequestBody); - } @Test @@ -468,10 +472,12 @@ public void testForConnectionCreateCompleteness() { .collect(Collectors.toSet()); final String message = - "If this test is failing, it means you added a field to ConnectionCreate!\nCongratulations, but you're not done yet..\n" - + "\tYou should update WebBackendConnectionsHandler::toConnectionCreate\n" - + "\tand ensure that the field is tested in WebBackendConnectionsHandlerTest::testToConnectionCreate\n" - + "Then you can add the field name here to make this test pass. Cheers!"; + """ + If this test is failing, it means you added a field to ConnectionCreate! + Congratulations, but you're not done yet.. + \tYou should update WebBackendConnectionsHandler::toConnectionCreate + \tand ensure that the field is tested in WebBackendConnectionsHandlerTest::testToConnectionCreate + Then you can add the field name here to make this test pass. Cheers!"""; assertEquals(handledMethods, methods, message); } @@ -487,10 +493,12 @@ public void testForConnectionUpdateCompleteness() { .collect(Collectors.toSet()); final String message = - "If this test is failing, it means you added a field to ConnectionUpdate!\nCongratulations, but you're not done yet..\n" - + "\tYou should update WebBackendConnectionsHandler::toConnectionUpdate\n" - + "\tand ensure that the field is tested in WebBackendConnectionsHandlerTest::testToConnectionUpdate\n" - + "Then you can add the field name here to make this test pass. Cheers!"; + """ + If this test is failing, it means you added a field to ConnectionUpdate! + Congratulations, but you're not done yet.. + \tYou should update WebBackendConnectionsHandler::toConnectionUpdate + \tand ensure that the field is tested in WebBackendConnectionsHandlerTest::testToConnectionUpdate + Then you can add the field name here to make this test pass. Cheers!"""; assertEquals(handledMethods, methods, message); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index 0e1160955a07..daf28b3575b3 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -35,7 +35,8 @@ public class ConnectionHelpers { - private static final String STREAM_NAME = "users-data"; + private static final String STREAM_NAME_BASE = "users-data"; + private static final String STREAM_NAME = STREAM_NAME_BASE + "0"; private static final String FIELD_NAME = "id"; private static final String BASIC_SCHEDULE_TIME_UNIT = "days"; private static final long BASIC_SCHEDULE_UNITS = 1L; @@ -200,7 +201,7 @@ private static io.airbyte.protocol.models.AirbyteStream generateBasicAirbyteStre public static AirbyteCatalog generateBasicApiCatalog() { return new AirbyteCatalog().streams(Lists.newArrayList(new AirbyteStreamAndConfiguration() - .stream(generateBasicApiStream()) + .stream(generateBasicApiStream(null)) .config(generateBasicApiStreamConfig()))); } @@ -208,7 +209,7 @@ public static AirbyteCatalog generateMultipleStreamsApiCatalog(final int streams final List streamAndConfigurations = new ArrayList<>(); for (int i = 0; i < streamsCount; i++) { streamAndConfigurations.add(new AirbyteStreamAndConfiguration() - .stream(generateBasicApiStream()) + .stream(generateBasicApiStream(String.valueOf(i))) .config(generateBasicApiStreamConfig())); } return new AirbyteCatalog().streams(streamAndConfigurations); @@ -225,8 +226,12 @@ private static AirbyteStreamConfiguration generateBasicApiStreamConfig() { } private static AirbyteStream generateBasicApiStream() { + return generateBasicApiStream(null); + } + + private static AirbyteStream generateBasicApiStream(final String nameSuffix) { return new AirbyteStream() - .name(STREAM_NAME) + .name(nameSuffix == null ? STREAM_NAME : STREAM_NAME_BASE + nameSuffix) .jsonSchema(generateBasicJsonSchema()) .defaultCursorField(Lists.newArrayList(FIELD_NAME)) .sourceDefinedCursor(false) diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index edbdaebfcd9f..89a1478de4a4 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -8016,49 +8016,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] }, { @@ -8073,49 +8049,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] } ] @@ -8291,49 +8243,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] }, { @@ -8348,49 +8276,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] } ] @@ -8630,49 +8534,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] }, { @@ -8687,49 +8567,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] } ] @@ -8852,49 +8708,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] }, { @@ -8909,49 +8741,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] } ] @@ -9132,49 +8940,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] }, { @@ -9189,49 +8973,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] } ] @@ -9354,49 +9114,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] }, { @@ -9411,49 +9147,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] } ] @@ -9634,49 +9346,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] }, { @@ -9691,49 +9379,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] } ] @@ -9856,49 +9520,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] }, { @@ -9913,49 +9553,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] } ] @@ -10132,49 +9748,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] }, { @@ -10189,49 +9781,25 @@

Example data

}, "updateStream" : [ { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } }, { "updateFieldSchema" : { - "fieldName" : [ "fieldName", "fieldName" ], - "oldSchema" : { - "key" : "{}" - }, - "newSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "addField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] }, "transformType" : "add_field", "removeField" : { - "fieldName" : [ "fieldName", "fieldName" ], - "fieldSchema" : { - "key" : "{}" - } + "fieldName" : [ "fieldName", "fieldName" ] } } ] } ] @@ -11655,7 +11223,7 @@

FieldNameAndSchema -
fieldName
-
fieldSchema
map[String, Object] JSONSchema representation of the field
+
fieldSchema
@@ -11663,8 +11231,8 @@

FieldSchemaUpdate -
fieldName
-
oldSchema
map[String, Object] JSONSchema representation of the field
-
newSchema
map[String, Object] JSONSchema representation of the field
+
oldSchema
+
newSchema