diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 6c7976473041..a74c90e8ca7d 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.27.5-alpha +current_version = 0.28.0-alpha commit = True tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(\-[a-z]+)? diff --git a/.env b/.env index bbda3bf36f1e..1cf70f599bf0 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -VERSION=0.27.5-alpha +VERSION=0.28.0-alpha # Airbyte Internal Job Database, see https://docs.airbyte.io/operator-guides/configuring-airbyte-db DATABASE_USER=docker diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index fff151cb7d65..9a81a5e8ff87 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -952,7 +952,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/OperationCreateOrUpdate" + $ref: "#/components/schemas/OperatorConfiguration" required: true responses: "200": @@ -2074,7 +2074,7 @@ components: operations: type: array items: - $ref: "#/components/schemas/OperationCreateOrUpdate" + $ref: "#/components/schemas/WebBackendOperationCreateOrUpdate" ConnectionRead: type: object required: @@ -2173,7 +2173,10 @@ components: required: - name - operatorConfiguration + - workspaceId properties: + workspaceId: + $ref: "#/components/schemas/WorkspaceId" name: type: string operatorConfiguration: @@ -2191,7 +2194,7 @@ components: type: string operatorConfiguration: $ref: "#/components/schemas/OperatorConfiguration" - OperationCreateOrUpdate: + WebBackendOperationCreateOrUpdate: type: object required: - name @@ -2209,7 +2212,10 @@ components: - operationId - name - operatorConfiguration + - workspaceId properties: + workspaceId: + $ref: "#/components/schemas/WorkspaceId" operationId: $ref: "#/components/schemas/OperationId" name: diff --git a/airbyte-config/models/src/main/resources/types/StandardSyncOperation.yaml b/airbyte-config/models/src/main/resources/types/StandardSyncOperation.yaml index b6b146865222..d053e3fe8db1 100644 --- a/airbyte-config/models/src/main/resources/types/StandardSyncOperation.yaml +++ b/airbyte-config/models/src/main/resources/types/StandardSyncOperation.yaml @@ -5,6 +5,7 @@ title: StandardSyncOperation description: Configuration of an operation to apply during a sync type: object required: + - workspaceId - operationId - name - operatorType @@ -28,3 +29,6 @@ properties: if not set or false, the configuration is active. if true, then this configuration is permanently off. type: boolean + workspaceId: + type: string + format: uuid diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/PersistenceConstants.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/PersistenceConstants.java index 0f5203484066..9525eefcc62f 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/PersistenceConstants.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/PersistenceConstants.java @@ -29,6 +29,6 @@ public class PersistenceConstants { // for MVP we only support one workspace per deployment and we hard code its id. - public static UUID DEFAULT_WORKSPACE_ID = UUID.fromString("5ae6b09b-fdec-41af-aaf7-7d94cfc33ef6"); + public static final UUID DEFAULT_WORKSPACE_ID = UUID.fromString("5ae6b09b-fdec-41af-aaf7-7d94cfc33ef6"); } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/WorkspaceFinder.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/WorkspaceFinder.java new file mode 100644 index 000000000000..549fe91d7c06 --- /dev/null +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/WorkspaceFinder.java @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.config.persistence; + +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.SourceConnection; +import java.util.UUID; + +// todo (cgardens) - this is just a utility for this PR. will need to figure out the "right" way to +// do this with jared. +public class WorkspaceFinder { + + public static UUID getWorkspaceForSourceId(SourceConnection source) { + return source.getWorkspaceId(); + } + + public static UUID getWorkspaceForDestination(DestinationConnection destination) { + return destination.getWorkspaceId(); + } + +} diff --git a/airbyte-migration/src/main/java/io/airbyte/migrate/Migrations.java b/airbyte-migration/src/main/java/io/airbyte/migrate/Migrations.java index 337faae86fee..3a411ed424d9 100644 --- a/airbyte-migration/src/main/java/io/airbyte/migrate/Migrations.java +++ b/airbyte-migration/src/main/java/io/airbyte/migrate/Migrations.java @@ -35,6 +35,7 @@ import io.airbyte.migrate.migrations.MigrationV0_25_0; import io.airbyte.migrate.migrations.MigrationV0_26_0; import io.airbyte.migrate.migrations.MigrationV0_27_0; +import io.airbyte.migrate.migrations.MigrationV0_28_0; import io.airbyte.migrate.migrations.NoOpMigration; import java.util.List; @@ -55,6 +56,7 @@ public class Migrations { private static final Migration MIGRATION_V_0_25_0 = new MigrationV0_25_0(MIGRATION_V_0_24_0); private static final Migration MIGRATION_V_0_26_0 = new MigrationV0_26_0(MIGRATION_V_0_25_0); private static final Migration MIGRATION_V_0_27_0 = new MigrationV0_27_0(MIGRATION_V_0_26_0); + private static final Migration MIGRATION_V_0_28_0 = new MigrationV0_28_0(MIGRATION_V_0_27_0); // all migrations must be added to the list in the order that they should be applied. public static final List MIGRATIONS = ImmutableList.of( @@ -72,6 +74,7 @@ public class Migrations { MIGRATION_V_0_24_0, MIGRATION_V_0_25_0, MIGRATION_V_0_26_0, - MIGRATION_V_0_27_0); + MIGRATION_V_0_27_0, + MIGRATION_V_0_28_0); } diff --git a/airbyte-migration/src/main/java/io/airbyte/migrate/migrations/MigrationV0_28_0.java b/airbyte-migration/src/main/java/io/airbyte/migrate/migrations/MigrationV0_28_0.java new file mode 100644 index 000000000000..136aa831766e --- /dev/null +++ b/airbyte-migration/src/main/java/io/airbyte/migrate/migrations/MigrationV0_28_0.java @@ -0,0 +1,139 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.migrate.migrations; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.migrate.Migration; +import io.airbyte.migrate.MigrationUtils; +import io.airbyte.migrate.ResourceId; +import io.airbyte.migrate.ResourceType; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.stream.Stream; + +public class MigrationV0_28_0 extends BaseMigration implements Migration { + + private static final UUID DEFAULT_WORKSPACE_ID = UUID.fromString("5ae6b09b-fdec-41af-aaf7-7d94cfc33ef6"); + + private static final ResourceId CONNECTION_RESOURCE_ID = ResourceId.fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC"); + private static final ResourceId SOURCE_RESOURCE_ID = ResourceId.fromConstantCase(ResourceType.CONFIG, "SOURCE_CONNECTION"); + private static final ResourceId OPERATION_RESOURCE_ID = ResourceId.fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC_OPERATION"); + + private static final String MIGRATION_VERSION = "0.28.0-alpha"; + @VisibleForTesting + protected final Migration previousMigration; + + public MigrationV0_28_0(Migration previousMigration) { + super(previousMigration); + this.previousMigration = previousMigration; + } + + @Override + public String getVersion() { + return MIGRATION_VERSION; + } + + private static final Path RESOURCE_PATH = Path.of("migrations/migrationV0_28_0/airbyte_config"); + + @Override + public Map getOutputSchema() { + final Map outputSchema = new HashMap<>(previousMigration.getOutputSchema()); + outputSchema.put( + OPERATION_RESOURCE_ID, + MigrationUtils.getSchemaFromResourcePath(RESOURCE_PATH, OPERATION_RESOURCE_ID)); + return outputSchema; + } + + @Override + public void migrate(Map> inputDataImmutable, + Map> outputData) { + // we need to figure out which workspace to associate an operation with. we use the following + // strategy to avoid ever storing too much info in memory: + // 1. iterate over connectors stream + // 2. build mapping of connections to source + // 3. build mapping of operation ids to connections + // 4. iterate over sources stream + // 5. build mapping of sources to workspaces + // 6. iterate over operations stream, + // 7. map from operation => connection => source => workspace. set that workspace for the operation. + // 8. if no mapping use default workspace id + + final Map connectionIdToSourceId = new HashMap<>(); + final Map operationIdToConnectionId = new HashMap<>(); + final Map sourceIdToWorkspaceId = new HashMap<>(); + + final Map> inputData = new HashMap<>(inputDataImmutable); + // process connections. + inputData.remove(CONNECTION_RESOURCE_ID).forEach(r -> { + final UUID connectionId = UUID.fromString(r.get("connectionId").asText()); + final UUID sourceId = UUID.fromString(r.get("sourceId").asText()); + connectionIdToSourceId.put(connectionId, sourceId); + if (r.hasNonNull("operationIds")) { + r.get("operationIds").forEach(operationIdString -> { + final UUID operationId = UUID.fromString(operationIdString.asText()); + operationIdToConnectionId.put(operationId, connectionId); + }); + } + + outputData.get(CONNECTION_RESOURCE_ID).accept(r); + }); + // process sources. + inputData.remove(SOURCE_RESOURCE_ID).forEach(r -> { + final UUID sourceId = UUID.fromString(r.get("sourceId").asText()); + final UUID workspaceId = UUID.fromString(r.get("workspaceId").asText()); + sourceIdToWorkspaceId.put(sourceId, workspaceId); + + outputData.get(SOURCE_RESOURCE_ID).accept(r); + }); + // process operations. + inputData.remove(OPERATION_RESOURCE_ID).forEach(r -> { + final UUID operationId = UUID.fromString(r.get("operationId").asText()); + + final UUID workspaceId; + final UUID connectionId = operationIdToConnectionId.get(operationId); + if (connectionId == null) { + workspaceId = DEFAULT_WORKSPACE_ID; + } else { + final UUID sourceId = connectionIdToSourceId.get(connectionId); + workspaceId = sourceIdToWorkspaceId.get(sourceId); + } + ((ObjectNode) r).put("workspaceId", workspaceId.toString()); + + outputData.get(OPERATION_RESOURCE_ID).accept(r); + }); + + // process the remaining resources. + for (final Map.Entry> entry : inputData.entrySet()) { + final Consumer recordConsumer = outputData.get(entry.getKey()); + entry.getValue().forEach(recordConsumer); + } + } + +} diff --git a/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/OperatorDbt.yaml b/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/OperatorDbt.yaml new file mode 100644 index 000000000000..4ece33e9b90f --- /dev/null +++ b/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/OperatorDbt.yaml @@ -0,0 +1,18 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/OperatorDbt.yaml +title: OperatorDbt +description: Settings for a DBT operator +type: object +required: + - gitRepoUrl +additionalProperties: false +properties: + gitRepoUrl: + type: string + gitRepoBranch: + type: string + dockerImage: + type: string + dbtArguments: + type: string diff --git a/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/OperatorNormalization.yaml b/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/OperatorNormalization.yaml new file mode 100644 index 000000000000..d06d6364cb06 --- /dev/null +++ b/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/OperatorNormalization.yaml @@ -0,0 +1,13 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/OperatorNormalization.yaml +title: OperatorNormalization +description: Settings for a normalization operator +type: object +additionalProperties: false +properties: + option: + type: string + enum: + - basic + #- unnesting diff --git a/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/OperatorType.yaml b/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/OperatorType.yaml new file mode 100644 index 000000000000..756980539622 --- /dev/null +++ b/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/OperatorType.yaml @@ -0,0 +1,10 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/OperatorType.yaml +title: OperatorType +description: Type of Operator +type: string +enum: + # - destination + - normalization + - dbt diff --git a/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/StandardSyncOperation.yaml b/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/StandardSyncOperation.yaml new file mode 100644 index 000000000000..d053e3fe8db1 --- /dev/null +++ b/airbyte-migration/src/main/resources/migrations/migrationV0_28_0/airbyte_config/StandardSyncOperation.yaml @@ -0,0 +1,34 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardSyncOperation.yaml +title: StandardSyncOperation +description: Configuration of an operation to apply during a sync +type: object +required: + - workspaceId + - operationId + - name + - operatorType +additionalProperties: false +properties: + operationId: + type: string + format: uuid + name: + type: string + # Instead of this type field, we would prefer a json schema "oneOf" but unfortunately, + # the jsonschema2pojo does not seem to support it yet: https://github.com/joelittlejohn/jsonschema2pojo/issues/392 + operatorType: + "$ref": OperatorType.yaml + operatorNormalization: + "$ref": OperatorNormalization.yaml + operatorDbt: + "$ref": OperatorDbt.yaml + tombstone: + description: + if not set or false, the configuration is active. if true, then this + configuration is permanently off. + type: boolean + workspaceId: + type: string + format: uuid diff --git a/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrateV0_24_0Test.java b/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrateV0_24_0Test.java index 6e802a276ff8..86d981d33393 100644 --- a/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrateV0_24_0Test.java +++ b/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrateV0_24_0Test.java @@ -90,9 +90,8 @@ void testMigration() throws IOException { .of(STANDARD_SYNC_RESOURCE_ID, getResourceStream(OUTPUT_CONFIG_PATH + "/STANDARD_SYNC.yaml") .collect(Collectors.toList())); - final Map> expectedOutput = - MigrationTestUtils - .createExpectedOutput(migration.getOutputSchema().keySet(), expectedOutputOverrides); + final Map> expectedOutput = MigrationTestUtils + .createExpectedOutput(migration.getOutputSchema().keySet(), expectedOutputOverrides); final Map> outputAsList = MigrationTestUtils .collectConsumersToList(outputConsumer); diff --git a/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrateV0_28_0Test.java b/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrateV0_28_0Test.java new file mode 100644 index 000000000000..47ff15e7d85f --- /dev/null +++ b/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrateV0_28_0Test.java @@ -0,0 +1,118 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.migrate.migrations; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.functional.ListConsumer; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.yaml.Yamls; +import io.airbyte.migrate.MigrationTestUtils; +import io.airbyte.migrate.MigrationUtils; +import io.airbyte.migrate.Migrations; +import io.airbyte.migrate.ResourceId; +import io.airbyte.migrate.ResourceType; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.junit.jupiter.api.Test; + +public class MigrateV0_28_0Test { + + private static final String INPUT_CONFIG_PATH = "migrations/migrationV0_28_0/input_config"; + private static final String OUTPUT_CONFIG_PATH = "migrations/migrationV0_28_0/output_config"; + + private static final ResourceId CONNECTION_RESOURCE_ID = ResourceId.fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC"); + private static final ResourceId SOURCE_RESOURCE_ID = ResourceId + .fromConstantCase(ResourceType.CONFIG, "SOURCE_CONNECTION"); + private static final ResourceId OPERATION_RESOURCE_ID = ResourceId + .fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC_OPERATION"); + + private Stream getResourceStream(String resourcePath) throws IOException { + final ArrayNode nodeArray = (ArrayNode) Yamls + .deserialize(MoreResources.readResource(resourcePath)); + return StreamSupport + .stream(Spliterators.spliteratorUnknownSize(nodeArray.iterator(), 0), false); + } + + @Test + void testMigration() throws IOException { + final MigrationV0_28_0 migration = (MigrationV0_28_0) Migrations.MIGRATIONS + .stream() + .filter(m -> m instanceof MigrationV0_28_0) + .findAny() + .orElse(null); + assertNotNull(migration); + + final Map> inputConfigs = ImmutableMap.of( + CONNECTION_RESOURCE_ID, + getResourceStream(INPUT_CONFIG_PATH + "/STANDARD_SYNC.yaml"), + SOURCE_RESOURCE_ID, + getResourceStream(INPUT_CONFIG_PATH + "/SOURCE_CONNECTION.yaml"), + OPERATION_RESOURCE_ID, + getResourceStream(INPUT_CONFIG_PATH + "/STANDARD_SYNC_OPERATION.yaml")); + + final Map> outputConsumer = MigrationTestUtils + .createOutputConsumer(migration.getOutputSchema().keySet()); + + migration.migrate(inputConfigs, MigrationUtils.mapRecordConsumerToConsumer(outputConsumer)); + + final Map> expectedOutputOverrides = ImmutableMap.of( + CONNECTION_RESOURCE_ID, + getResourceStream(INPUT_CONFIG_PATH + "/STANDARD_SYNC.yaml") + .collect(Collectors.toList()), + SOURCE_RESOURCE_ID, + getResourceStream(INPUT_CONFIG_PATH + "/SOURCE_CONNECTION.yaml") + .collect(Collectors.toList()), + OPERATION_RESOURCE_ID, + getResourceStream(OUTPUT_CONFIG_PATH + "/STANDARD_SYNC_OPERATION.yaml") + .collect(Collectors.toList())); + + final Map> expectedOutput = MigrationTestUtils + .createExpectedOutput(migration.getOutputSchema().keySet(), expectedOutputOverrides); + + final Map> outputAsList = MigrationTestUtils + .collectConsumersToList(outputConsumer); + + assertExpectedOutput(expectedOutput, outputAsList); + } + + private void assertExpectedOutput(Map> expected, Map> actual) { + assertEquals(expected.keySet(), actual.keySet()); + expected.entrySet().forEach(entry -> { + assertEquals(entry.getValue(), actual.get(entry.getKey()), String.format("Resources output do not match for %s:", entry.getKey().getName())); + }); + assertEquals(expected, actual); + } + +} diff --git a/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/input_config/SOURCE_CONNECTION.yaml b/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/input_config/SOURCE_CONNECTION.yaml new file mode 100644 index 000000000000..c0768d99b868 --- /dev/null +++ b/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/input_config/SOURCE_CONNECTION.yaml @@ -0,0 +1,14 @@ +--- +- name: "source-1" + sourceDefinitionId: "25c5221d-dce2-4163-ade9-739ef790f503" + workspaceId: "f3ba292d-14c7-4a87-93ce-88b715b88eb1" + sourceId: "51755adc-32c3-4c97-ad5e-d163e48f78d9" + configuration: + ssl: false + password: "password" + username: "postgres" + schema: "public" + database: "postgres" + port: 3000 + host: "localhost" + tombstone: false diff --git a/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/input_config/STANDARD_SYNC.yaml b/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/input_config/STANDARD_SYNC.yaml new file mode 100644 index 000000000000..ac6cd13c5b0e --- /dev/null +++ b/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/input_config/STANDARD_SYNC.yaml @@ -0,0 +1,64 @@ +--- +- prefix: "" + sourceId: "51755adc-32c3-4c97-ad5e-d163e48f78d9" + destinationId: "d9e7c0ce-a713-42ba-9960-be6bac285e49" + connectionId: "c8e45fd7-9e12-42b0-ba11-b88c622e0195" + operationIds: ["8f75e15a-a427-4f12-94ff-b3130df50f92"] + name: "default" + catalog: + streams: + - stream: + name: "users" + json_schema: + type: "object" + properties: + id: + type: "number" + col1: + type: "string" + supported_sync_modes: + - "full_refresh" + - "incremental" + default_cursor_field: [] + source_defined_primary_key: + - - "id" + namespace: "public" + sync_mode: "incremental" + cursor_field: + - "id" + destination_sync_mode: "append_dedup" + primary_key: + - - "id" + status: "active" + manual: true +- prefix: "" + sourceId: "51755adc-32c3-4c97-ad5e-d163e48f78d9" + destinationId: "d9e7c0ce-a713-42ba-9960-be6bac285e49" + connectionId: "6b68b147-c60c-4c63-867e-82894707b947" + name: "default" + catalog: + streams: + - stream: + name: "users" + json_schema: + type: "object" + properties: + id: + type: "number" + col1: + type: "string" + supported_sync_modes: + - "full_refresh" + - "incremental" + default_cursor_field: [] + source_defined_primary_key: + - - "id" + namespace: "public" + sync_mode: "incremental" + cursor_field: + - "id" + destination_sync_mode: "append_dedup" + primary_key: + - - "id" + status: "active" + manual: true diff --git a/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/input_config/STANDARD_SYNC_OPERATION.yaml b/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/input_config/STANDARD_SYNC_OPERATION.yaml new file mode 100644 index 000000000000..fa3750fe1940 --- /dev/null +++ b/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/input_config/STANDARD_SYNC_OPERATION.yaml @@ -0,0 +1,13 @@ +--- +- operationId: "8f75e15a-a427-4f12-94ff-b3130df50f92" + name: "default-normalization" + operatorType: "normalization" + operatorNormalization: + option: "basic" + tombstone: false +- operationId: "a9eecf38-69d2-4361-a975-3bbe8833dd60" + name: "default-normalization" + operatorType: "normalization" + operatorNormalization: + option: "basic" + tombstone: false diff --git a/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/output_config/STANDARD_SYNC_OPERATION.yaml b/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/output_config/STANDARD_SYNC_OPERATION.yaml new file mode 100644 index 000000000000..31e97f20f9f4 --- /dev/null +++ b/airbyte-migration/src/test/resources/migrations/migrationV0_28_0/output_config/STANDARD_SYNC_OPERATION.yaml @@ -0,0 +1,17 @@ +--- +# operation attached to a connection. +- operationId: "8f75e15a-a427-4f12-94ff-b3130df50f92" + workspaceId: "f3ba292d-14c7-4a87-93ce-88b715b88eb1" + name: "default-normalization" + operatorType: "normalization" + operatorNormalization: + option: "basic" + tombstone: false +# operation not attached to any connection. +- operationId: "a9eecf38-69d2-4361-a975-3bbe8833dd60" + workspaceId: "5ae6b09b-fdec-41af-aaf7-7d94cfc33ef6" + name: "default-normalization" + operatorType: "normalization" + operatorNormalization: + option: "basic" + tombstone: false diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImport.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImport.java index 2eaee3bfd985..01b25e7177e1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImport.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImport.java @@ -301,8 +301,7 @@ private Stream readConfigsFromArchive(final Path storageRoot, final Confi } } - private void validateJson(final T config, final ConfigSchema configType) - throws JsonValidationException { + private void validateJson(final T config, final ConfigSchema configType) throws JsonValidationException { JsonNode schema = JsonSchemaValidator.getSchema(configType.getFile()); jsonSchemaValidator.ensure(schema, Jsons.jsonNode(config)); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index b81eeb381962..05f900b38e81 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -55,11 +55,11 @@ import io.airbyte.api.model.Notification; import io.airbyte.api.model.NotificationRead; import io.airbyte.api.model.OperationCreate; -import io.airbyte.api.model.OperationCreateOrUpdate; import io.airbyte.api.model.OperationIdRequestBody; import io.airbyte.api.model.OperationRead; import io.airbyte.api.model.OperationReadList; import io.airbyte.api.model.OperationUpdate; +import io.airbyte.api.model.OperatorConfiguration; import io.airbyte.api.model.SlugRequestBody; import io.airbyte.api.model.SourceCoreConfig; import io.airbyte.api.model.SourceCreate; @@ -425,8 +425,8 @@ public JobInfoRead resetConnection(@Valid ConnectionIdRequestBody connectionIdRe // Operations @Override - public CheckOperationRead checkOperation(OperationCreateOrUpdate operationCreateOrUpdate) { - return execute(() -> operationsHandler.checkOperation(operationCreateOrUpdate)); + public CheckOperationRead checkOperation(OperatorConfiguration operatorConfiguration) { + return execute(() -> operationsHandler.checkOperation(operatorConfiguration)); } @Override diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index 230c7cdaf1bc..aee3aba4e6fd 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -38,10 +38,9 @@ import io.airbyte.api.model.ConnectionStatus; import io.airbyte.api.model.ConnectionUpdate; import io.airbyte.api.model.ResourceRequirements; -import io.airbyte.api.model.SyncMode; import io.airbyte.api.model.WorkspaceIdRequestBody; import io.airbyte.commons.enums.Enums; -import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.json.Jsons; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Schedule; import io.airbyte.config.StandardDestinationDefinition; @@ -57,7 +56,9 @@ import io.airbyte.workers.WorkerUtils; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -83,12 +84,36 @@ public ConnectionsHandler(final ConfigRepository configRepository, final Workspa this(configRepository, UUID::randomUUID, workspaceHelper); } + private void validateWorkspace(UUID sourceId, UUID destinationId, Set operationIds) { + final UUID sourceWorkspace = workspaceHelper.getWorkspaceForSourceId(sourceId); + final UUID destinationWorkspace = workspaceHelper.getWorkspaceForDestinationId(destinationId); + + Preconditions.checkArgument( + sourceWorkspace.equals(destinationWorkspace), + String.format( + "Source and destination do not belong to the same workspace. Source id: %s, Source workspace id: %s, Destination id: %s, Destination workspace id: %s", + sourceId, + sourceWorkspace, + destinationId, + destinationWorkspace)); + + for (UUID operationId : operationIds) { + final UUID operationWorkspace = workspaceHelper.getWorkspaceForOperationId(operationId); + Preconditions.checkArgument( + sourceWorkspace.equals(operationWorkspace), + String.format( + "Operation and connection do not belong to the same workspace. Workspace id: %s, Operation id: %s, Operation workspace id: %s", + sourceWorkspace, + operationId, + operationWorkspace)); + } + } + public ConnectionRead createConnection(ConnectionCreate connectionCreate) throws JsonValidationException, IOException, ConfigNotFoundException { - Exceptions.toRuntime(() -> { - final UUID sourceWorkspace = workspaceHelper.getWorkspaceForSourceId(connectionCreate.getSourceId()); - final UUID destinationWorkspace = workspaceHelper.getWorkspaceForDestinationId(connectionCreate.getDestinationId()); - Preconditions.checkArgument(sourceWorkspace.equals(destinationWorkspace), "Source and destination must belong to the same workspace!"); - }); + // Validate source and destination + configRepository.getSourceConnection(connectionCreate.getSourceId()); + configRepository.getDestinationConnection(connectionCreate.getDestinationId()); + validateWorkspace(connectionCreate.getSourceId(), connectionCreate.getDestinationId(), new HashSet<>(connectionCreate.getOperationIds())); final UUID connectionId = uuidGenerator.get(); @@ -131,10 +156,6 @@ public ConnectionRead createConnection(ConnectionCreate connectionCreate) throws standardSync.withManual(true); } - // Validate source and destination - configRepository.getSourceConnection(connectionCreate.getSourceId()); - configRepository.getDestinationConnection(connectionCreate.getDestinationId()); - configRepository.writeStandardSync(standardSync); trackNewConnection(standardSync); @@ -178,7 +199,11 @@ private Builder generateMetadata(final StandardSync standardSync public ConnectionRead updateConnection(ConnectionUpdate connectionUpdate) throws ConfigNotFoundException, IOException, JsonValidationException { // retrieve and update sync - final StandardSync persistedSync = configRepository.getStandardSync(connectionUpdate.getConnectionId()) + final StandardSync persistedSync = configRepository.getStandardSync(connectionUpdate.getConnectionId()); + + validateWorkspace(persistedSync.getSourceId(), persistedSync.getDestinationId(), new HashSet<>(connectionUpdate.getOperationIds())); + + final StandardSync newConnection = Jsons.clone(persistedSync) .withNamespaceDefinition(Enums.convertTo(connectionUpdate.getNamespaceDefinition(), NamespaceDefinitionType.class)) .withNamespaceFormat(connectionUpdate.getNamespaceFormat()) .withPrefix(connectionUpdate.getPrefix()) @@ -188,13 +213,13 @@ public ConnectionRead updateConnection(ConnectionUpdate connectionUpdate) throws // update Resource Requirements if (connectionUpdate.getResourceRequirements() != null) { - persistedSync.withResourceRequirements(new io.airbyte.config.ResourceRequirements() + newConnection.withResourceRequirements(new io.airbyte.config.ResourceRequirements() .withCpuRequest(connectionUpdate.getResourceRequirements().getCpuRequest()) .withCpuLimit(connectionUpdate.getResourceRequirements().getCpuLimit()) .withMemoryRequest(connectionUpdate.getResourceRequirements().getMemoryRequest()) .withMemoryLimit(connectionUpdate.getResourceRequirements().getMemoryLimit())); } else { - persistedSync.withResourceRequirements(WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); + newConnection.withResourceRequirements(WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); } // update sync schedule @@ -202,12 +227,12 @@ public ConnectionRead updateConnection(ConnectionUpdate connectionUpdate) throws final Schedule newSchedule = new Schedule() .withTimeUnit(toPersistenceTimeUnit(connectionUpdate.getSchedule().getTimeUnit())) .withUnits(connectionUpdate.getSchedule().getUnits()); - persistedSync.withManual(false).withSchedule(newSchedule); + newConnection.withManual(false).withSchedule(newSchedule); } else { - persistedSync.withManual(true).withSchedule(null); + newConnection.withManual(true).withSchedule(null); } - configRepository.writeStandardSync(persistedSync); + configRepository.writeStandardSync(newConnection); return buildConnectionRead(connectionUpdate.getConnectionId()); } @@ -308,10 +333,6 @@ private StandardSync.Status toPersistenceStatus(ConnectionStatus apiStatus) { return Enums.convertTo(apiStatus, StandardSync.Status.class); } - private SyncMode toApiSyncMode(io.airbyte.config.SyncMode persistenceStatus) { - return Enums.convertTo(persistenceStatus, SyncMode.class); - } - private ConnectionStatus toApiStatus(StandardSync.Status status) { return Enums.convertTo(status, ConnectionStatus.class); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/OperationsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/OperationsHandler.java index 2257c19bf499..8981684b9d12 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/OperationsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/OperationsHandler.java @@ -31,7 +31,6 @@ import io.airbyte.api.model.CheckOperationRead.StatusEnum; import io.airbyte.api.model.ConnectionIdRequestBody; import io.airbyte.api.model.OperationCreate; -import io.airbyte.api.model.OperationCreateOrUpdate; import io.airbyte.api.model.OperationIdRequestBody; import io.airbyte.api.model.OperationRead; import io.airbyte.api.model.OperationReadList; @@ -70,9 +69,9 @@ public OperationsHandler(final ConfigRepository configRepository) { this(configRepository, UUID::randomUUID); } - public CheckOperationRead checkOperation(OperationCreateOrUpdate operationCreateOrUpdate) { + public CheckOperationRead checkOperation(OperatorConfiguration operationCheck) { try { - toStandardSyncOperation(operationCreateOrUpdate); + validateOperation(operationCheck); } catch (IllegalArgumentException e) { return new CheckOperationRead().status(StatusEnum.FAILED) .message(e.getMessage()); @@ -90,6 +89,7 @@ public OperationRead createOperation(OperationCreate operationCreate) private static StandardSyncOperation toStandardSyncOperation(OperationCreate operationCreate) { final StandardSyncOperation standardSyncOperation = new StandardSyncOperation() + .withWorkspaceId(operationCreate.getWorkspaceId()) .withName(operationCreate.getName()) .withOperatorType(Enums.convertTo(operationCreate.getOperatorConfiguration().getOperatorType(), OperatorType.class)) .withTombstone(false); @@ -109,25 +109,13 @@ private static StandardSyncOperation toStandardSyncOperation(OperationCreate ope return standardSyncOperation; } - private static StandardSyncOperation toStandardSyncOperation(OperationCreateOrUpdate operationCreate) { - final StandardSyncOperation standardSyncOperation = new StandardSyncOperation() - .withName(operationCreate.getName()) - .withOperatorType(Enums.convertTo(operationCreate.getOperatorConfiguration().getOperatorType(), OperatorType.class)) - .withTombstone(false); - if (operationCreate.getOperatorConfiguration().getOperatorType() == io.airbyte.api.model.OperatorType.NORMALIZATION) { - Preconditions.checkArgument(operationCreate.getOperatorConfiguration().getNormalization() != null); - standardSyncOperation.withOperatorNormalization(new OperatorNormalization() - .withOption(Enums.convertTo(operationCreate.getOperatorConfiguration().getNormalization().getOption(), Option.class))); + private void validateOperation(OperatorConfiguration operatorConfiguration) { + if (operatorConfiguration.getOperatorType() == io.airbyte.api.model.OperatorType.NORMALIZATION) { + Preconditions.checkArgument(operatorConfiguration.getNormalization() != null); } - if (operationCreate.getOperatorConfiguration().getOperatorType() == io.airbyte.api.model.OperatorType.DBT) { - Preconditions.checkArgument(operationCreate.getOperatorConfiguration().getDbt() != null); - standardSyncOperation.withOperatorDbt(new OperatorDbt() - .withGitRepoUrl(operationCreate.getOperatorConfiguration().getDbt().getGitRepoUrl()) - .withGitRepoBranch(operationCreate.getOperatorConfiguration().getDbt().getGitRepoBranch()) - .withDockerImage(operationCreate.getOperatorConfiguration().getDbt().getDockerImage()) - .withDbtArguments(operationCreate.getOperatorConfiguration().getDbt().getDbtArguments())); + if (operatorConfiguration.getOperatorType() == io.airbyte.api.model.OperatorType.DBT) { + Preconditions.checkArgument(operatorConfiguration.getDbt() != null); } - return standardSyncOperation; } public OperationRead updateOperation(OperationUpdate operationUpdate) @@ -268,6 +256,7 @@ private static OperationRead buildOperationRead(StandardSyncOperation standardSy .dbtArguments(standardSyncOperation.getOperatorDbt().getDbtArguments())); } return new OperationRead() + .workspaceId(standardSyncOperation.getWorkspaceId()) .operationId(standardSyncOperation.getOperationId()) .name(standardSyncOperation.getName()) .operatorConfiguration(operatorConfiguration); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 35838bcde811..329a0978873e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -40,14 +40,12 @@ import io.airbyte.api.model.DestinationIdRequestBody; import io.airbyte.api.model.DestinationRead; import io.airbyte.api.model.JobConfigType; -import io.airbyte.api.model.JobInfoRead; import io.airbyte.api.model.JobListRequestBody; import io.airbyte.api.model.JobRead; import io.airbyte.api.model.JobReadList; import io.airbyte.api.model.JobStatus; import io.airbyte.api.model.JobWithAttemptsRead; import io.airbyte.api.model.OperationCreate; -import io.airbyte.api.model.OperationCreateOrUpdate; import io.airbyte.api.model.OperationReadList; import io.airbyte.api.model.OperationUpdate; import io.airbyte.api.model.SourceDiscoverSchemaRead; @@ -58,6 +56,7 @@ import io.airbyte.api.model.WebBackendConnectionReadList; import io.airbyte.api.model.WebBackendConnectionRequestBody; import io.airbyte.api.model.WebBackendConnectionUpdate; +import io.airbyte.api.model.WebBackendOperationCreateOrUpdate; import io.airbyte.api.model.WorkspaceIdRequestBody; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; @@ -260,7 +259,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(WebBackendConnectionU ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(webBackendConnectionUpdate.getConnectionId()); // wait for this to execute - JobInfoRead resetJob = schedulerHandler.resetConnection(connectionId); + schedulerHandler.resetConnection(connectionId); // just create the job schedulerHandler.syncConnection(connectionId); @@ -279,8 +278,8 @@ private List createOperations(WebBackendConnectionCreate webBackendConnect private List updateOperations(WebBackendConnectionUpdate webBackendConnectionUpdate) throws JsonValidationException, ConfigNotFoundException, IOException { - final ConnectionRead connectionRead = - connectionsHandler.getConnection(new ConnectionIdRequestBody().connectionId(webBackendConnectionUpdate.getConnectionId())); + final ConnectionRead connectionRead = connectionsHandler + .getConnection(new ConnectionIdRequestBody().connectionId(webBackendConnectionUpdate.getConnectionId())); final List originalOperationIds = new ArrayList<>(connectionRead.getOperationIds()); final List operationIds = new ArrayList<>(); for (var operationCreateOrUpdate : webBackendConnectionUpdate.getOperations()) { @@ -298,7 +297,7 @@ private List updateOperations(WebBackendConnectionUpdate webBackendConnect } @VisibleForTesting - protected static OperationCreate toOperationCreate(OperationCreateOrUpdate operationCreateOrUpdate) { + protected static OperationCreate toOperationCreate(WebBackendOperationCreateOrUpdate operationCreateOrUpdate) { OperationCreate operationCreate = new OperationCreate(); operationCreate.name(operationCreateOrUpdate.getName()); @@ -308,7 +307,7 @@ protected static OperationCreate toOperationCreate(OperationCreateOrUpdate opera } @VisibleForTesting - protected static OperationUpdate toOperationUpdate(OperationCreateOrUpdate operationCreateOrUpdate) { + protected static OperationUpdate toOperationUpdate(WebBackendOperationCreateOrUpdate operationCreateOrUpdate) { OperationUpdate operationUpdate = new OperationUpdate(); operationUpdate.operationId(operationCreateOrUpdate.getOperationId()); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendDestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendDestinationHandler.java index 2a0349e9f7e5..dd0190d176b9 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendDestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendDestinationHandler.java @@ -32,7 +32,6 @@ import io.airbyte.api.model.DestinationIdRequestBody; import io.airbyte.api.model.DestinationRead; import io.airbyte.api.model.DestinationRecreate; -import io.airbyte.commons.lang.Exceptions; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.server.errors.ConnectFailureKnownException; import io.airbyte.server.helpers.WorkspaceHelper; @@ -60,8 +59,8 @@ public WebBackendDestinationHandler(final DestinationHandler destinationHandler, public DestinationRead webBackendRecreateDestinationAndCheck(DestinationRecreate destinationRecreate) throws ConfigNotFoundException, IOException, JsonValidationException { - Exceptions.toRuntime(() -> Preconditions.checkArgument( - workspaceHelper.getWorkspaceForDestinationId(destinationRecreate.getDestinationId()).equals(destinationRecreate.getWorkspaceId()))); + Preconditions.checkArgument( + workspaceHelper.getWorkspaceForDestinationId(destinationRecreate.getDestinationId()).equals(destinationRecreate.getWorkspaceId())); final DestinationCreate destinationCreate = new DestinationCreate(); destinationCreate.setConnectionConfiguration(destinationRecreate.getConnectionConfiguration()); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendSourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendSourceHandler.java index d89368009f0f..0d41e33d2cf8 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendSourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendSourceHandler.java @@ -32,7 +32,6 @@ import io.airbyte.api.model.SourceIdRequestBody; import io.airbyte.api.model.SourceRead; import io.airbyte.api.model.SourceRecreate; -import io.airbyte.commons.lang.Exceptions; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.server.errors.ConnectFailureKnownException; import io.airbyte.server.helpers.WorkspaceHelper; @@ -58,8 +57,7 @@ public WebBackendSourceHandler(final SourceHandler sourceHandler, final Schedule public SourceRead webBackendRecreateSourceAndCheck(SourceRecreate sourceRecreate) throws ConfigNotFoundException, IOException, JsonValidationException { - Exceptions.toRuntime(() -> Preconditions - .checkArgument(workspaceHelper.getWorkspaceForSourceId(sourceRecreate.getSourceId()).equals(sourceRecreate.getWorkspaceId()))); + Preconditions.checkArgument(workspaceHelper.getWorkspaceForSourceId(sourceRecreate.getSourceId()).equals(sourceRecreate.getWorkspaceId())); final SourceCreate sourceCreate = new SourceCreate(); sourceCreate.setConnectionConfiguration(sourceRecreate.getConnectionConfiguration()); diff --git a/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java index 4001998fb860..eec3df8b15be 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java +++ b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java @@ -28,10 +28,12 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobConfig; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.models.Job; @@ -41,13 +43,14 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.ExecutionException; -import org.apache.commons.lang3.NotImplementedException; +import org.checkerframework.checker.nullness.qual.NonNull; public class WorkspaceHelper { private final LoadingCache sourceToWorkspaceCache; private final LoadingCache destinationToWorkspaceCache; private final LoadingCache connectionToWorkspaceCache; + private final LoadingCache operationToWorkspaceCache; private final LoadingCache jobToWorkspaceCache; public WorkspaceHelper(ConfigRepository configRepository, JobPersistence jobPersistence) { @@ -55,8 +58,8 @@ public WorkspaceHelper(ConfigRepository configRepository, JobPersistence jobPers this.sourceToWorkspaceCache = getExpiringCache(new CacheLoader<>() { @Override - public UUID load(UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException { - final SourceConnection source = configRepository.getSourceConnection(sourceId);; + public UUID load(@NonNull UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException { + final SourceConnection source = configRepository.getSourceConnection(sourceId); return source.getWorkspaceId(); } @@ -65,7 +68,7 @@ public UUID load(UUID sourceId) throws JsonValidationException, ConfigNotFoundEx this.destinationToWorkspaceCache = getExpiringCache(new CacheLoader<>() { @Override - public UUID load(UUID destinationId) throws JsonValidationException, ConfigNotFoundException, IOException { + public UUID load(@NonNull UUID destinationId) throws JsonValidationException, ConfigNotFoundException, IOException { final DestinationConnection destination = configRepository.getDestinationConnection(destinationId); return destination.getWorkspaceId(); } @@ -75,8 +78,8 @@ public UUID load(UUID destinationId) throws JsonValidationException, ConfigNotFo this.connectionToWorkspaceCache = getExpiringCache(new CacheLoader<>() { @Override - public UUID load(UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException, ExecutionException { - final StandardSync connection = configRepository.getStandardSync(connectionId);; + public UUID load(@NonNull UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException { + final StandardSync connection = configRepository.getStandardSync(connectionId); final UUID sourceId = connection.getSourceId(); final UUID destinationId = connection.getDestinationId(); return getWorkspaceForConnection(sourceId, destinationId); @@ -84,10 +87,20 @@ public UUID load(UUID connectionId) throws JsonValidationException, ConfigNotFou }); + this.operationToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + + @Override + public UUID load(@NonNull UUID operationId) throws JsonValidationException, ConfigNotFoundException, IOException { + final StandardSyncOperation operation = configRepository.getStandardSyncOperation(operationId); + return operation.getWorkspaceId(); + } + + }); + this.jobToWorkspaceCache = getExpiringCache(new CacheLoader<>() { @Override - public UUID load(Long jobId) throws IOException, ExecutionException { + public UUID load(@NonNull Long jobId) throws IOException { final Job job = jobPersistence.getJob(jobId); if (job.getConfigType() == JobConfig.ConfigType.SYNC || job.getConfigType() == JobConfig.ConfigType.RESET_CONNECTION) { return getWorkspaceForConnectionId(UUID.fromString(job.getScope())); @@ -99,32 +112,42 @@ public UUID load(Long jobId) throws IOException, ExecutionException { }); } - public UUID getWorkspaceForSourceId(UUID sourceId) throws ExecutionException { - return sourceToWorkspaceCache.get(sourceId); + public UUID getWorkspaceForSourceId(UUID sourceId) { + return swallowExecutionException(() -> sourceToWorkspaceCache.get(sourceId)); } - public UUID getWorkspaceForDestinationId(UUID destinationId) throws ExecutionException { - return destinationToWorkspaceCache.get(destinationId); + public UUID getWorkspaceForDestinationId(UUID destinationId) { + return swallowExecutionException(() -> destinationToWorkspaceCache.get(destinationId)); } - public UUID getWorkspaceForJobId(Long jobId) throws IOException, ExecutionException { - return jobToWorkspaceCache.get(jobId); + public UUID getWorkspaceForJobId(Long jobId) { + return swallowExecutionException(() -> jobToWorkspaceCache.get(jobId)); } - public UUID getWorkspaceForConnection(UUID sourceId, UUID destinationId) throws ExecutionException { + public UUID getWorkspaceForConnection(UUID sourceId, UUID destinationId) { final UUID sourceWorkspace = getWorkspaceForSourceId(sourceId); final UUID destinationWorkspace = getWorkspaceForDestinationId(destinationId); Preconditions.checkArgument(Objects.equals(sourceWorkspace, destinationWorkspace), "Source and destination must be from the same workspace!"); - return sourceWorkspace; + return swallowExecutionException(() -> sourceWorkspace); } - public UUID getWorkspaceForConnectionId(UUID connectionId) throws ExecutionException { - return connectionToWorkspaceCache.get(connectionId); + public UUID getWorkspaceForConnectionId(UUID connectionId) { + return swallowExecutionException(() -> connectionToWorkspaceCache.get(connectionId)); } public UUID getWorkspaceForOperationId(UUID operationId) { - throw new NotImplementedException(); + return swallowExecutionException(() -> operationToWorkspaceCache.get(operationId)); + } + + // the ExecutionException is an implementation detail the helper and does not need to be handled by + // callers. + private static UUID swallowExecutionException(CheckedSupplier supplier) { + try { + return supplier.get(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } private static LoadingCache getExpiringCache(CacheLoader cacheLoader) { diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index 89ab300fbda9..bb9e919711a2 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -24,8 +24,9 @@ package io.airbyte.server.handlers; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -45,18 +46,27 @@ import io.airbyte.api.model.SyncMode; import io.airbyte.api.model.WorkspaceIdRequestBody; import io.airbyte.commons.enums.Enums; -import io.airbyte.config.*; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DataType; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.Schedule; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.server.helpers.ConnectionHelpers; -import io.airbyte.server.helpers.SourceHelpers; import io.airbyte.server.helpers.WorkspaceHelper; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.WorkerUtils; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; @@ -67,25 +77,64 @@ class ConnectionsHandlerTest { private ConfigRepository configRepository; private Supplier uuidGenerator; - private StandardSync standardSync; private ConnectionsHandler connectionsHandler; + private UUID workspaceId; + private UUID sourceDefinitionId; + private UUID sourceId; + private UUID destinationDefinitionId; + private UUID destinationId; + private SourceConnection source; + private DestinationConnection destination; + private StandardSync standardSync; + private UUID connectionId; + private UUID operationId; + private StandardSyncOperation standardSyncOperation; private WorkspaceHelper workspaceHelper; @SuppressWarnings("unchecked") @BeforeEach - void setUp() throws IOException, ExecutionException { + void setUp() throws IOException, JsonValidationException, ConfigNotFoundException { + workspaceId = UUID.randomUUID(); + sourceDefinitionId = UUID.randomUUID(); + sourceId = UUID.randomUUID(); + destinationDefinitionId = UUID.randomUUID(); + destinationId = UUID.randomUUID(); + connectionId = UUID.randomUUID(); + operationId = UUID.randomUUID(); + source = new SourceConnection() + .withSourceId(sourceId) + .withWorkspaceId(workspaceId); + destination = new DestinationConnection() + .withDestinationId(destinationId) + .withWorkspaceId(workspaceId); + standardSync = new StandardSync() + .withConnectionId(connectionId) + .withName("presto to hudi") + .withNamespaceDefinition(JobSyncConfig.NamespaceDefinitionType.SOURCE) + .withNamespaceFormat(null) + .withPrefix("presto_to_hudi") + .withStatus(StandardSync.Status.ACTIVE) + .withCatalog(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog()) + .withSourceId(sourceId) + .withDestinationId(destinationId) + .withOperationIds(List.of(operationId)) + .withManual(false) + .withSchedule(ConnectionHelpers.generateBasicSchedule()) + .withResourceRequirements(WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); + + standardSyncOperation = new StandardSyncOperation() + .withOperationId(operationId) + .withWorkspaceId(workspaceId); + configRepository = mock(ConfigRepository.class); uuidGenerator = mock(Supplier.class); - - source = SourceHelpers.generateSource(UUID.randomUUID()); - standardSync = ConnectionHelpers.generateSyncWithSourceId(source.getSourceId()); workspaceHelper = mock(WorkspaceHelper.class); - UUID workspaceId = UUID.randomUUID(); - when(workspaceHelper.getWorkspaceForSourceId(any())).thenReturn(workspaceId); - when(workspaceHelper.getWorkspaceForDestinationId(any())).thenReturn(workspaceId); - connectionsHandler = new ConnectionsHandler(configRepository, uuidGenerator, workspaceHelper); + + when(workspaceHelper.getWorkspaceForSourceId(sourceId)).thenReturn(workspaceId); + when(workspaceHelper.getWorkspaceForDestinationId(destinationId)).thenReturn(workspaceId); + when(workspaceHelper.getWorkspaceForOperationId(operationId)).thenReturn(workspaceId); } @Test @@ -129,11 +178,34 @@ void testCreateConnection() throws JsonValidationException, ConfigNotFoundExcept verify(configRepository).writeStandardSync(standardSync); } + @Test + void testValidateConnectionCreateSourceAndDestinationInDifferenceWorkspace() { + when(workspaceHelper.getWorkspaceForDestinationId(destinationId)).thenReturn(UUID.randomUUID()); + + final ConnectionCreate connectionCreate = new ConnectionCreate() + .sourceId(standardSync.getSourceId()) + .destinationId(standardSync.getDestinationId()); + + assertThrows(IllegalArgumentException.class, () -> connectionsHandler.createConnection(connectionCreate)); + } + + @Test + void testValidateConnectionCreateOperationInDifferentWorkspace() { + when(workspaceHelper.getWorkspaceForOperationId(operationId)).thenReturn(UUID.randomUUID()); + + final ConnectionCreate connectionCreate = new ConnectionCreate() + .sourceId(standardSync.getSourceId()) + .destinationId(standardSync.getDestinationId()) + .operationIds(Collections.singletonList(operationId)); + + assertThrows(IllegalArgumentException.class, () -> connectionsHandler.createConnection(connectionCreate)); + } + @Test void testCreateConnectionWithBadDefinitionIds() throws JsonValidationException, ConfigNotFoundException, IOException { when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId()); - UUID sourceDefinitionIdBad = UUID.randomUUID(); - UUID destinationDefinitionIdBad = UUID.randomUUID(); + UUID sourceIdBad = UUID.randomUUID(); + UUID destinationIdBad = UUID.randomUUID(); final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() .withName("source-test") @@ -145,15 +217,15 @@ void testCreateConnectionWithBadDefinitionIds() throws JsonValidationException, when(configRepository.getSourceDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(sourceDefinition); when(configRepository.getDestinationDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(destinationDefinition); - when(configRepository.getSourceConnection(sourceDefinitionIdBad)) - .thenThrow(new ConfigNotFoundException(ConfigSchema.SOURCE_CONNECTION, sourceDefinitionIdBad)); - when(configRepository.getDestinationConnection(destinationDefinitionIdBad)) - .thenThrow(new ConfigNotFoundException(ConfigSchema.DESTINATION_CONNECTION, destinationDefinitionIdBad)); + when(configRepository.getSourceConnection(sourceIdBad)) + .thenThrow(new ConfigNotFoundException(ConfigSchema.SOURCE_CONNECTION, sourceIdBad)); + when(configRepository.getDestinationConnection(destinationIdBad)) + .thenThrow(new ConfigNotFoundException(ConfigSchema.DESTINATION_CONNECTION, destinationIdBad)); final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog(); final ConnectionCreate connectionCreateBadSource = new ConnectionCreate() - .sourceId(sourceDefinitionIdBad) + .sourceId(sourceIdBad) .destinationId(standardSync.getDestinationId()) .operationIds(standardSync.getOperationIds()) .name("presto to hudi") @@ -168,7 +240,7 @@ void testCreateConnectionWithBadDefinitionIds() throws JsonValidationException, final ConnectionCreate connectionCreateBadDestination = new ConnectionCreate() .sourceId(standardSync.getSourceId()) - .destinationId(destinationDefinitionIdBad) + .destinationId(destinationIdBad) .operationIds(standardSync.getOperationIds()) .name("presto to hudi") .namespaceDefinition(NamespaceDefinitionType.SOURCE) @@ -193,6 +265,7 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept .namespaceFormat(standardSync.getNamespaceFormat()) .prefix(standardSync.getPrefix()) .connectionId(standardSync.getConnectionId()) + .operationIds(standardSync.getOperationIds()) .status(ConnectionStatus.INACTIVE) .schedule(null) .syncCatalog(catalog); @@ -208,6 +281,7 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept .withPrefix("presto_to_hudi") .withSourceId(standardSync.getSourceId()) .withDestinationId(standardSync.getDestinationId()) + .withOperationIds(standardSync.getOperationIds()) .withStatus(StandardSync.Status.INACTIVE) .withCatalog(configuredCatalog) .withManual(true) @@ -233,6 +307,18 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept verify(configRepository).writeStandardSync(updatedStandardSync); } + @Test + void testValidateConnectionUpdateOperationInDifferentWorkspace() throws JsonValidationException, ConfigNotFoundException, IOException { + when(workspaceHelper.getWorkspaceForOperationId(operationId)).thenReturn(UUID.randomUUID()); + when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync); + + final ConnectionUpdate connectionUpdate = new ConnectionUpdate() + .connectionId(standardSync.getConnectionId()) + .operationIds(Collections.singletonList(operationId)); + + assertThrows(IllegalArgumentException.class, () -> connectionsHandler.updateConnection(connectionUpdate)); + } + @Test void testGetConnection() throws JsonValidationException, ConfigNotFoundException, IOException { when(configRepository.getStandardSync(standardSync.getConnectionId())) @@ -293,7 +379,7 @@ void testDeleteConnection() throws JsonValidationException, IOException, ConfigN } @Test - void failOnUnmatchedWorkspacesInCreate() throws ExecutionException, JsonValidationException, ConfigNotFoundException, IOException { + void failOnUnmatchedWorkspacesInCreate() throws JsonValidationException, ConfigNotFoundException, IOException { when(workspaceHelper.getWorkspaceForSourceId(standardSync.getSourceId())).thenReturn(UUID.randomUUID()); when(workspaceHelper.getWorkspaceForDestinationId(standardSync.getDestinationId())).thenReturn(UUID.randomUUID()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/OperationsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/OperationsHandlerTest.java index 342f9efc1a48..c2f33e2bf0aa 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/OperationsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/OperationsHandlerTest.java @@ -72,6 +72,7 @@ void setUp() throws IOException { operationsHandler = new OperationsHandler(configRepository, uuidGenerator); standardSyncOperation = new StandardSyncOperation() + .withWorkspaceId(UUID.randomUUID()) .withOperationId(UUID.randomUUID()) .withName("presto to hudi") .withOperatorType(io.airbyte.config.StandardSyncOperation.OperatorType.NORMALIZATION) @@ -87,6 +88,7 @@ void testCreateOperation() throws JsonValidationException, ConfigNotFoundExcepti when(configRepository.getStandardSyncOperation(standardSyncOperation.getOperationId())).thenReturn(standardSyncOperation); final OperationCreate operationCreate = new OperationCreate() + .workspaceId(standardSyncOperation.getWorkspaceId()) .name(standardSyncOperation.getName()) .operatorConfiguration(new OperatorConfiguration() .operatorType(OperatorType.NORMALIZATION) @@ -95,6 +97,7 @@ void testCreateOperation() throws JsonValidationException, ConfigNotFoundExcepti final OperationRead actualOperationRead = operationsHandler.createOperation(operationCreate); final OperationRead expectedOperationRead = new OperationRead() + .workspaceId(standardSyncOperation.getWorkspaceId()) .operationId(standardSyncOperation.getOperationId()) .name(standardSyncOperation.getName()) .operatorConfiguration(new OperatorConfiguration() @@ -120,6 +123,7 @@ void testUpdateOperation() throws JsonValidationException, ConfigNotFoundExcepti .dbtArguments("--full-refresh"))); final StandardSyncOperation updatedStandardSyncOperation = new StandardSyncOperation() + .withWorkspaceId(standardSyncOperation.getWorkspaceId()) .withOperationId(standardSyncOperation.getOperationId()) .withName(standardSyncOperation.getName()) .withOperatorType(io.airbyte.config.StandardSyncOperation.OperatorType.DBT) @@ -137,6 +141,7 @@ void testUpdateOperation() throws JsonValidationException, ConfigNotFoundExcepti final OperationRead actualOperationRead = operationsHandler.updateOperation(operationUpdate); final OperationRead expectedOperationRead = new OperationRead() + .workspaceId(standardSyncOperation.getWorkspaceId()) .operationId(standardSyncOperation.getOperationId()) .name(standardSyncOperation.getName()) .operatorConfiguration(new OperatorConfiguration() @@ -166,6 +171,7 @@ void testGetOperation() throws JsonValidationException, ConfigNotFoundException, private OperationRead generateOperationRead() { return new OperationRead() + .workspaceId(standardSyncOperation.getWorkspaceId()) .operationId(standardSyncOperation.getOperationId()) .name(standardSyncOperation.getName()) .operatorConfiguration(new OperatorConfiguration() diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 70dd5525e931..3af5f4334194 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -55,7 +55,6 @@ import io.airbyte.api.model.JobStatus; import io.airbyte.api.model.JobWithAttemptsRead; import io.airbyte.api.model.NamespaceDefinitionType; -import io.airbyte.api.model.OperationCreateOrUpdate; import io.airbyte.api.model.OperationRead; import io.airbyte.api.model.OperationReadList; import io.airbyte.api.model.OperationUpdate; @@ -70,6 +69,7 @@ import io.airbyte.api.model.WebBackendConnectionReadList; import io.airbyte.api.model.WebBackendConnectionRequestBody; import io.airbyte.api.model.WebBackendConnectionUpdate; +import io.airbyte.api.model.WebBackendOperationCreateOrUpdate; import io.airbyte.api.model.WorkspaceIdRequestBody; import io.airbyte.commons.enums.Enums; import io.airbyte.config.DestinationConnection; @@ -439,7 +439,7 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept @Test void testUpdateConnectionWithOperations() throws JsonValidationException, ConfigNotFoundException, IOException { - final OperationCreateOrUpdate operationCreateOrUpdate = new OperationCreateOrUpdate() + final WebBackendOperationCreateOrUpdate operationCreateOrUpdate = new WebBackendOperationCreateOrUpdate() .name("Test Operation") .operationId(connectionRead.getOperationIds().get(0)); final OperationUpdate operationUpdate = WebBackendConnectionsHandler.toOperationUpdate(operationCreateOrUpdate); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendDestinationHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendDestinationHandlerTest.java index 886c1cd9547a..89c376eb8fd7 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendDestinationHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendDestinationHandlerTest.java @@ -46,7 +46,6 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.UUID; -import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,7 +62,7 @@ public class WebBackendDestinationHandlerTest { private DestinationRead destinationRead; @BeforeEach - public void setup() throws IOException, ExecutionException { + public void setup() throws IOException { destinationHandler = mock(DestinationHandler.class); schedulerHandler = mock(SchedulerHandler.class); workspaceHelper = mock(WorkspaceHelper.class); @@ -150,7 +149,7 @@ public void testRecreateDeletesNewCreatedDestinationWhenFails() throws JsonValid } @Test - public void testUnmatchedWorkspaces() throws ExecutionException, IOException, JsonValidationException, ConfigNotFoundException { + public void testUnmatchedWorkspaces() throws IOException, JsonValidationException, ConfigNotFoundException { when(workspaceHelper.getWorkspaceForDestinationId(destinationRead.getDestinationId())).thenReturn(UUID.randomUUID()); DestinationCreate destinationCreate = new DestinationCreate(); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendSourceHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendSourceHandlerTest.java index 3867d75f386b..b0d6705bacf1 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendSourceHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendSourceHandlerTest.java @@ -46,7 +46,6 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.UUID; -import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,7 +62,7 @@ public class WebBackendSourceHandlerTest { private SourceRead sourceRead; @BeforeEach - public void setup() throws IOException, ExecutionException { + public void setup() throws IOException { sourceHandler = mock(SourceHandler.class); schedulerHandler = mock(SchedulerHandler.class); workspaceHelper = mock(WorkspaceHelper.class); @@ -115,8 +114,7 @@ public void testReCreatesSourceWhenCheckConnectionSucceeds() throws JsonValidati } @Test - public void testRecreateDeletesNewCreatedSourceWhenFails() - throws JsonValidationException, IOException, ConfigNotFoundException, ExecutionException { + public void testRecreateDeletesNewCreatedSourceWhenFails() throws JsonValidationException, IOException, ConfigNotFoundException { SourceCreate sourceCreate = new SourceCreate(); sourceCreate.setName(sourceRead.getName()); sourceCreate.setConnectionConfiguration(sourceRead.getConnectionConfiguration()); @@ -149,7 +147,7 @@ public void testRecreateDeletesNewCreatedSourceWhenFails() } @Test - public void testUnmatchedWorkspaces() throws ExecutionException, IOException, JsonValidationException, ConfigNotFoundException { + public void testUnmatchedWorkspaces() throws IOException, JsonValidationException, ConfigNotFoundException { when(workspaceHelper.getWorkspaceForSourceId(sourceRead.getSourceId())).thenReturn(UUID.randomUUID()); SourceCreate sourceCreate = new SourceCreate(); diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java index 64c3035e9719..79f30357ce97 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java @@ -28,15 +28,17 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.google.common.util.concurrent.UncheckedExecutionException; import io.airbyte.commons.json.Jsons; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobConfig; import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.OperatorNormalization; +import io.airbyte.config.OperatorNormalization.Option; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.FileSystemConfigPersistence; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -49,20 +51,54 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.UUID; -import java.util.concurrent.ExecutionException; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class WorkspaceHelperTest { + private static final UUID WORKSPACE_ID = UUID.randomUUID(); + private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID(); + private static final UUID SOURCE_ID = UUID.randomUUID(); + private static final UUID DEST_DEFINITION_ID = UUID.randomUUID(); + private static final UUID DEST_ID = UUID.randomUUID(); + private static final UUID CONNECTION_ID = UUID.randomUUID(); + private static final UUID OPERATION_ID = UUID.randomUUID(); + private static final StandardSourceDefinition SOURCE_DEF = new StandardSourceDefinition().withSourceDefinitionId(SOURCE_DEFINITION_ID); + private static final SourceConnection SOURCE = new SourceConnection() + .withSourceId(SOURCE_ID) + .withSourceDefinitionId(SOURCE_DEFINITION_ID) + .withWorkspaceId(WORKSPACE_ID) + .withConfiguration(Jsons.deserialize("{}")) + .withName("source") + .withTombstone(false); + private static final StandardDestinationDefinition DEST_DEF = new StandardDestinationDefinition().withDestinationDefinitionId(DEST_DEFINITION_ID); + private static final DestinationConnection DEST = new DestinationConnection() + .withDestinationId(DEST_ID) + .withDestinationDefinitionId(DEST_DEFINITION_ID) + .withWorkspaceId(WORKSPACE_ID) + .withConfiguration(Jsons.deserialize("{}")) + .withName("dest") + .withTombstone(false); + private static final StandardSync CONNECTION = new StandardSync() + .withConnectionId(CONNECTION_ID) + .withSourceId(SOURCE_ID) + .withDestinationId(DEST_ID).withCatalog(new ConfiguredAirbyteCatalog().withStreams(new ArrayList<>())) + .withManual(true); + private static final StandardSyncOperation OPERATION = new StandardSyncOperation() + .withOperationId(OPERATION_ID) + .withWorkspaceId(WORKSPACE_ID) + .withName("the new normal") + .withOperatorNormalization(new OperatorNormalization().withOption(Option.BASIC)) + .withTombstone(false); + Path tmpDir; ConfigRepository configRepository; JobPersistence jobPersistence; WorkspaceHelper workspaceHelper; @BeforeEach - public void setup() throws IOException { + public void setup() throws IOException, JsonValidationException { tmpDir = Files.createTempDirectory("workspace_helper_test_" + RandomStringUtils.randomAlphabetic(5)); configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir)); @@ -73,130 +109,96 @@ public void setup() throws IOException { @Test public void testObjectsThatDoNotExist() { - assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForSourceId(UUID.randomUUID())); - assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForDestinationId(UUID.randomUUID())); - assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnectionId(UUID.randomUUID())); - assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnection(UUID.randomUUID(), UUID.randomUUID())); - assertThrows(UncheckedExecutionException.class, () -> workspaceHelper.getWorkspaceForJobId(0L)); - // todo: add operationId check + assertThrows(RuntimeException.class, () -> workspaceHelper.getWorkspaceForSourceId(UUID.randomUUID())); + assertThrows(RuntimeException.class, () -> workspaceHelper.getWorkspaceForDestinationId(UUID.randomUUID())); + assertThrows(RuntimeException.class, () -> workspaceHelper.getWorkspaceForConnectionId(UUID.randomUUID())); + assertThrows(RuntimeException.class, () -> workspaceHelper.getWorkspaceForConnection(UUID.randomUUID(), UUID.randomUUID())); + assertThrows(RuntimeException.class, () -> workspaceHelper.getWorkspaceForOperationId(UUID.randomUUID())); + assertThrows(RuntimeException.class, () -> workspaceHelper.getWorkspaceForJobId(0L)); } @Test - public void testSource() throws IOException, ExecutionException, JsonValidationException { - UUID source = UUID.randomUUID(); - UUID workspace = UUID.randomUUID(); - - UUID sourceDefinition = UUID.randomUUID(); - configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); - - SourceConnection sourceConnection = new SourceConnection() - .withSourceId(source) - .withSourceDefinitionId(sourceDefinition) - .withWorkspaceId(workspace) - .withConfiguration(Jsons.deserialize("{}")) - .withName("source") - .withTombstone(false); + public void testSource() throws IOException, JsonValidationException { + configRepository.writeStandardSource(SOURCE_DEF); + configRepository.writeSourceConnection(SOURCE); - configRepository.writeSourceConnection(sourceConnection); - UUID retrievedWorkspace = workspaceHelper.getWorkspaceForSourceId(source); - - assertEquals(workspace, retrievedWorkspace); + final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForSourceId(SOURCE_ID); + assertEquals(WORKSPACE_ID, retrievedWorkspace); // check that caching is working - configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(UUID.randomUUID())); - UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForSourceId(source); - assertEquals(workspace, retrievedWorkspaceAfterUpdate); + configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(UUID.randomUUID())); + final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForSourceId(SOURCE_ID); + assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate); } @Test - public void testDestination() throws IOException, ExecutionException, JsonValidationException { - UUID destination = UUID.randomUUID(); - UUID workspace = UUID.randomUUID(); - - UUID destinationDefinition = UUID.randomUUID(); - configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); - - DestinationConnection destinationConnection = new DestinationConnection() - .withDestinationId(destination) - .withDestinationDefinitionId(destinationDefinition) - .withWorkspaceId(workspace) - .withConfiguration(Jsons.deserialize("{}")) - .withName("dest") - .withTombstone(false); + public void testDestination() throws IOException, JsonValidationException { + configRepository.writeStandardDestinationDefinition(DEST_DEF); + configRepository.writeDestinationConnection(DEST); - configRepository.writeDestinationConnection(destinationConnection); - UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationId(destination); - - assertEquals(workspace, retrievedWorkspace); + final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationId(DEST_ID); + assertEquals(WORKSPACE_ID, retrievedWorkspace); // check that caching is working - configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(UUID.randomUUID())); - UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); - assertEquals(workspace, retrievedWorkspaceAfterUpdate); + configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID())); + final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(DEST_ID); + assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate); } @Test - public void testConnectionAndJobs() throws IOException, ExecutionException, JsonValidationException { - UUID workspace = UUID.randomUUID(); - - // set up source - UUID source = UUID.randomUUID(); - - UUID sourceDefinition = UUID.randomUUID(); - configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); + public void testConnection() throws IOException, JsonValidationException { + configRepository.writeStandardSource(SOURCE_DEF); + configRepository.writeSourceConnection(SOURCE); + configRepository.writeStandardDestinationDefinition(DEST_DEF); + configRepository.writeDestinationConnection(DEST); - SourceConnection sourceConnection = new SourceConnection() - .withSourceId(source) - .withSourceDefinitionId(sourceDefinition) - .withWorkspaceId(workspace) - .withConfiguration(Jsons.deserialize("{}")) - .withName("source") - .withTombstone(false); - - configRepository.writeSourceConnection(sourceConnection); - - // set up destination - UUID destination = UUID.randomUUID(); + // set up connection + configRepository.writeStandardSync(CONNECTION); - UUID destinationDefinition = UUID.randomUUID(); - configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); + // test retrieving by connection id + final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForConnectionId(CONNECTION_ID); + assertEquals(WORKSPACE_ID, retrievedWorkspace); - DestinationConnection destinationConnection = new DestinationConnection() - .withDestinationId(destination) - .withDestinationDefinitionId(destinationDefinition) - .withWorkspaceId(workspace) - .withConfiguration(Jsons.deserialize("{}")) - .withName("dest") - .withTombstone(false); + // test retrieving by source and destination ids + final UUID retrievedWorkspaceBySourceAndDestination = workspaceHelper.getWorkspaceForConnectionId(CONNECTION_ID); + assertEquals(WORKSPACE_ID, retrievedWorkspaceBySourceAndDestination); - configRepository.writeDestinationConnection(destinationConnection); + // check that caching is working + final UUID newWorkspace = UUID.randomUUID(); + configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(newWorkspace)); + configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace)); + final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(DEST_ID); + assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate); + } - // set up connection - UUID connection = UUID.randomUUID(); - configRepository.writeStandardSync(new StandardSync().withManual(true).withConnectionId(connection).withSourceId(source) - .withDestinationId(destination).withCatalog(new ConfiguredAirbyteCatalog().withStreams(new ArrayList<>()))); + @Test + public void testOperation() throws IOException, JsonValidationException { + configRepository.writeStandardSyncOperation(OPERATION); // test retrieving by connection id - UUID retrievedWorkspace = workspaceHelper.getWorkspaceForConnectionId(connection); - assertEquals(workspace, retrievedWorkspace); - - // test retrieving by source and destination ids - UUID retrievedWorkspaceBySourceAndDestination = workspaceHelper.getWorkspaceForConnectionId(connection); - assertEquals(workspace, retrievedWorkspaceBySourceAndDestination); + final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForOperationId(OPERATION_ID); + assertEquals(WORKSPACE_ID, retrievedWorkspace); // check that caching is working - UUID newWorkspace = UUID.randomUUID(); - configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(newWorkspace)); - configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(newWorkspace)); - UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); - assertEquals(workspace, retrievedWorkspaceAfterUpdate); + configRepository.writeStandardSyncOperation(Jsons.clone(OPERATION).withWorkspaceId(UUID.randomUUID())); + final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForOperationId(OPERATION_ID); + assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate); + } + + @Test + public void testConnectionAndJobs() throws IOException, JsonValidationException { + configRepository.writeStandardSource(SOURCE_DEF); + configRepository.writeSourceConnection(SOURCE); + configRepository.writeStandardDestinationDefinition(DEST_DEF); + configRepository.writeDestinationConnection(DEST); + configRepository.writeStandardSync(CONNECTION); // test jobs - long jobId = 123; - Job job = new Job( + final long jobId = 123; + final Job job = new Job( jobId, JobConfig.ConfigType.SYNC, - connection.toString(), + CONNECTION_ID.toString(), new JobConfig().withConfigType(JobConfig.ConfigType.SYNC).withSync(new JobSyncConfig()), new ArrayList<>(), JobStatus.PENDING, @@ -205,8 +207,8 @@ public void testConnectionAndJobs() throws IOException, ExecutionException, Json System.currentTimeMillis()); when(jobPersistence.getJob(jobId)).thenReturn(job); - UUID jobWorkspace = workspaceHelper.getWorkspaceForJobId(jobId); - assertEquals(workspace, jobWorkspace); + final UUID jobWorkspace = workspaceHelper.getWorkspaceForJobId(jobId); + assertEquals(WORKSPACE_ID, jobWorkspace); } } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 0559f23a7553..37cb9128122c 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -908,7 +908,8 @@ private OperationRead createOperation() throws ApiException { .operatorType(OperatorType.NORMALIZATION).normalization(new OperatorNormalization().option( OptionEnum.BASIC)); - OperationCreate operationCreate = new OperationCreate() + final OperationCreate operationCreate = new OperationCreate() + .workspaceId(PersistenceConstants.DEFAULT_WORKSPACE_ID) .name("AccTestDestination-" + UUID.randomUUID()).operatorConfiguration(normalizationConfig); OperationRead operation = apiClient.getOperationApi().createOperation(operationCreate); diff --git a/airbyte-webapp/package-lock.json b/airbyte-webapp/package-lock.json index d56b2d23c60f..2eaa7f0c3e4c 100644 --- a/airbyte-webapp/package-lock.json +++ b/airbyte-webapp/package-lock.json @@ -1,6 +1,6 @@ { "name": "airbyte-webapp", - "version": "0.27.5-alpha", + "version": "0.28.0-alpha", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/airbyte-webapp/package.json b/airbyte-webapp/package.json index 85d698c7604c..27bc2cfb37c7 100644 --- a/airbyte-webapp/package.json +++ b/airbyte-webapp/package.json @@ -1,6 +1,6 @@ { "name": "airbyte-webapp", - "version": "0.27.5-alpha", + "version": "0.28.0-alpha", "private": true, "scripts": { "start": "react-scripts start", diff --git a/airbyte-webapp/src/core/domain/connection/operation.ts b/airbyte-webapp/src/core/domain/connection/operation.ts index 01d7faf5bad5..1c35cdb4832a 100644 --- a/airbyte-webapp/src/core/domain/connection/operation.ts +++ b/airbyte-webapp/src/core/domain/connection/operation.ts @@ -1,6 +1,7 @@ export interface Operation { name: string; id?: string; + workspaceId: string; operatorConfiguration: | DbtOperationConfiguration | NormalizationOperationConfiguration; diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx index c5cd05443e1b..86a528fab29f 100644 --- a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx @@ -23,6 +23,7 @@ import { DestinationDefinitionSpecification } from "core/resources/DestinationDe import { Connection, ScheduleProperties } from "core/resources/Connection"; import { ConnectionNamespaceDefinition } from "core/domain/connection"; import { SOURCE_NAMESPACE_TAG } from "core/domain/connector/source"; +import config from "config"; type FormikConnectionFormValues = { schedule?: ScheduleProperties | null; @@ -45,6 +46,7 @@ const SUPPORTED_MODES: [SyncMode, DestinationSyncMode][] = [ const DEFAULT_TRANSFORMATION: Transformation = { name: "My dbt transformations", + workspaceId: config.ui.workspaceId, operatorConfiguration: { operatorType: OperatorType.Dbt, dbt: { @@ -157,6 +159,7 @@ function mapFormPropsToOperation( } else { newOperations.push({ name: "Normalization", + workspaceId: config.ui.workspaceId, operatorConfiguration: { operatorType: OperatorType.Normalization, normalization: { diff --git a/docs/operator-guides/upgrading-airbyte.md b/docs/operator-guides/upgrading-airbyte.md index 096695282d19..23f01af760b4 100644 --- a/docs/operator-guides/upgrading-airbyte.md +++ b/docs/operator-guides/upgrading-airbyte.md @@ -81,7 +81,7 @@ If you are upgrading from (i.e. your current version of Airbyte is) Airbyte ver Here's an example of what it might look like with the values filled in. It assumes that the downloaded `airbyte_archive.tar.gz` is in `/tmp`. ```bash - docker run --rm -v /tmp:/config airbyte/migration:0.27.5-alpha --\ + docker run --rm -v /tmp:/config airbyte/migration:0.28.0-alpha --\ --input /config/airbyte_archive.tar.gz\ --output /config/airbyte_archive_migrated.tar.gz ``` diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index bbd18fcce856..ee86d4f89f96 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -2495,7 +2495,7 @@

Consumes

Request body

-
OperationCreateOrUpdate OperationCreateOrUpdate (required)
+
OperatorConfiguration OperatorConfiguration (required)
Body Parameter
@@ -2583,7 +2583,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }

Produces

@@ -2698,7 +2699,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }

Produces

@@ -2769,7 +2771,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { "name" : "name", "operationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -2783,7 +2786,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" } ] } @@ -2854,7 +2858,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }

Produces

@@ -4113,7 +4118,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { "name" : "name", "operationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -4127,7 +4133,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" } ], "name" : "name", "syncCatalog" : { @@ -4268,7 +4275,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { "name" : "name", "operationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -4282,7 +4290,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" } ], "name" : "name", "syncCatalog" : { @@ -4427,7 +4436,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { "name" : "name", "operationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -4441,7 +4451,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" } ], "name" : "name", "syncCatalog" : { @@ -4529,7 +4540,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { "name" : "name", "operationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -4543,7 +4555,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" } ], "name" : "name", "syncCatalog" : { @@ -4812,7 +4825,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { "name" : "name", "operationId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", @@ -4826,7 +4840,8 @@

Example data

"dbtArguments" : "dbtArguments", "gitRepoUrl" : "gitRepoUrl" } - } + }, + "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" } ], "name" : "name", "syncCatalog" : { @@ -5373,7 +5388,6 @@

Table of Contents

  • NotificationRead -
  • NotificationType -
  • OperationCreate -
  • -
  • OperationCreateOrUpdate -
  • OperationIdRequestBody -
  • OperationRead -
  • OperationReadList -
  • @@ -5410,6 +5424,7 @@

    Table of Contents

  • WebBackendConnectionRequestBody -
  • WebBackendConnectionUpdate -
  • WebBackendConnectionUpdate_allOf -
  • +
  • WebBackendOperationCreateOrUpdate -
  • WorkspaceCreate -
  • WorkspaceIdRequestBody -
  • WorkspaceRead -
  • @@ -5903,15 +5918,7 @@

    NotificationType - OperationCreate - Up

    -
    name
    -
    operatorConfiguration
    -
    -
    -
    -

    OperationCreateOrUpdate - Up

    -
    -
    -
    operationId (optional)
    UUID format: uuid
    +
    workspaceId
    UUID format: uuid
    name
    operatorConfiguration
    @@ -5927,7 +5934,8 @@

    OperationIdRequestBody -

    OperationRead - Up

    -
    operationId
    UUID format: uuid
    +
    workspaceId
    UUID format: uuid
    +
    operationId
    UUID format: uuid
    name
    operatorConfiguration
    @@ -6246,7 +6254,7 @@

    WebBackendConnectionUpdate
    status
    resourceRequirements (optional)
    withRefreshedCatalog (optional)
    -
    operations (optional)
    +
    operations (optional)

    +
    +

    WebBackendOperationCreateOrUpdate - Up

    +
    +
    +
    operationId (optional)
    UUID format: uuid
    +
    name
    +
    operatorConfiguration
    diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 98b73f0612d2..f65ff3aafac7 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -1,4 +1,4 @@ -AIRBYTE_VERSION=0.27.5-alpha +AIRBYTE_VERSION=0.28.0-alpha # Airbyte Internal Database, see https://docs.airbyte.io/operator-guides/configuring-airbyte-db DATABASE_USER=docker diff --git a/kube/overlays/stable-with-resource-limits/kustomization.yaml b/kube/overlays/stable-with-resource-limits/kustomization.yaml index 1f8fa1ffd8b5..d34163886056 100644 --- a/kube/overlays/stable-with-resource-limits/kustomization.yaml +++ b/kube/overlays/stable-with-resource-limits/kustomization.yaml @@ -8,15 +8,15 @@ bases: images: - name: airbyte/seed - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: airbyte/db - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: airbyte/scheduler - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: airbyte/server - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: airbyte/webapp - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: temporalio/auto-setup newTag: 1.7.0 diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index 98b73f0612d2..f65ff3aafac7 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -1,4 +1,4 @@ -AIRBYTE_VERSION=0.27.5-alpha +AIRBYTE_VERSION=0.28.0-alpha # Airbyte Internal Database, see https://docs.airbyte.io/operator-guides/configuring-airbyte-db DATABASE_USER=docker diff --git a/kube/overlays/stable/kustomization.yaml b/kube/overlays/stable/kustomization.yaml index 1365ed98e985..dd4ae1dc74b9 100644 --- a/kube/overlays/stable/kustomization.yaml +++ b/kube/overlays/stable/kustomization.yaml @@ -8,15 +8,15 @@ bases: images: - name: airbyte/seed - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: airbyte/db - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: airbyte/scheduler - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: airbyte/server - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: airbyte/webapp - newTag: 0.27.5-alpha + newTag: 0.28.0-alpha - name: temporalio/auto-setup newTag: 1.7.0