From fb9efb378dc52f207dab51a8be0e3f4192f809d5 Mon Sep 17 00:00:00 2001 From: Parker Mossman Date: Mon, 10 Oct 2022 13:34:19 -0700 Subject: [PATCH] Add Workspace and Connection Geography Support to API (#17650) * progress on adding geography throughout api * fix workspace handler test * more progress * implement workspace defaulting and add/update more tests * fix bootloader tests * set defaultGeography in missing places * add Geography column when reading Connection record from DB * fix pmd * add more comments/description * format * description --- .../TrackingClientSingletonTest.java | 7 +- airbyte-api/src/main/openapi/config.yaml | 52 +++++++ .../io/airbyte/bootloader/BootloaderApp.java | 4 +- .../airbyte/bootloader/BootloaderAppTest.java | 4 +- .../src/main/resources/types/Geography.yaml | 10 ++ .../main/resources/types/StandardSync.yaml | 3 + .../resources/types/StandardWorkspace.yaml | 3 + .../DatabaseConfigPersistence.java | 16 +- .../config/persistence/DbConverter.java | 17 +- .../BaseDatabaseConfigPersistenceTest.java | 7 +- ...DatabaseConfigPersistenceLoadDataTest.java | 4 +- .../airbyte/config/persistence/MockData.java | 28 ++-- .../airbyte/server/apis/ConfigurationApi.java | 9 ++ .../server/converters/ApiPojoConverters.java | 12 +- .../server/handlers/ConnectionsHandler.java | 24 ++- .../WebBackendConnectionsHandler.java | 5 +- .../WebBackendGeographiesHandler.java | 33 ++++ .../server/handlers/WorkspacesHandler.java | 17 +- .../handlers/helpers/ConnectionMatcher.java | 1 + .../handlers/ConnectionsHandlerTest.java | 146 ++++++++++++------ .../WebBackendConnectionsHandlerTest.java | 31 ++-- .../WebBackendGeographiesHandlerTest.java | 43 ++++++ .../handlers/WorkspacesHandlerTest.java | 48 ++++-- .../server/helpers/ConnectionHelpers.java | 14 +- .../api/generated-api-html/index.html | 68 ++++++++ 25 files changed, 498 insertions(+), 108 deletions(-) create mode 100644 airbyte-config/config-models/src/main/resources/types/Geography.yaml create mode 100644 airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendGeographiesHandler.java create mode 100644 airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendGeographiesHandlerTest.java diff --git a/airbyte-analytics/src/test/java/io/airbyte/analytics/TrackingClientSingletonTest.java b/airbyte-analytics/src/test/java/io/airbyte/analytics/TrackingClientSingletonTest.java index fb6908c051e2..782936f09a5a 100644 --- a/airbyte-analytics/src/test/java/io/airbyte/analytics/TrackingClientSingletonTest.java +++ b/airbyte-analytics/src/test/java/io/airbyte/analytics/TrackingClientSingletonTest.java @@ -12,6 +12,7 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.Geography; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; @@ -111,7 +112,8 @@ void testGetTrackingIdentityNonAnonymous() throws JsonValidationException, IOExc .withEmail(EMAIL) .withAnonymousDataCollection(false) .withNews(true) - .withSecurityUpdates(true); + .withSecurityUpdates(true) + .withDefaultGeography(Geography.AUTO); when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)).thenReturn(workspace); @@ -129,7 +131,8 @@ void testGetTrackingIdentityAnonymous() throws JsonValidationException, IOExcept .withEmail("a@airbyte.io") .withAnonymousDataCollection(true) .withNews(true) - .withSecurityUpdates(true); + .withSecurityUpdates(true) + .withDefaultGeography(Geography.AUTO); when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)).thenReturn(workspace); diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 6d13ebe89e34..7e90df6f3e70 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2036,6 +2036,24 @@ paths: $ref: "#/components/responses/NotFoundResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + /v1/web_backend/geographies/list: + post: + tags: + - web_backend + description: Returns all available geographies in which a data sync can run. + summary: | + Returns available geographies can be selected to run data syncs in a particular geography. + The 'auto' entry indicates that the sync will be automatically assigned to a geography according + to the platform default behavior. Entries other than 'auto' are two-letter country codes that + follow the ISO 3166-1 alpha-2 standard. + operationId: webBackendListGeographies + responses: + "200": + description: Successful operation + content: + application/json: + schema: + $ref: "#/components/schemas/WebBackendGeographiesListResult" /v1/jobs/list: post: tags: @@ -2260,6 +2278,8 @@ components: $ref: "#/components/schemas/Notification" displaySetupWizard: type: boolean + defaultGeography: + $ref: "#/components/schemas/Geography" Notification: type: object required: @@ -2362,6 +2382,8 @@ components: type: boolean feedbackDone: type: boolean + defaultGeography: + $ref: "#/components/schemas/Geography" WorkspaceUpdateName: type: object required: @@ -2397,6 +2419,8 @@ components: type: array items: $ref: "#/components/schemas/Notification" + defaultGeography: + $ref: "#/components/schemas/Geography" WorkspaceGiveFeedback: type: object required: @@ -2424,6 +2448,15 @@ components: type: boolean hasDestinations: type: boolean + WebBackendGeographiesListResult: + type: object + required: + - geographies + properties: + geographies: + type: array + items: + $ref: "#/components/schemas/Geography" # SLUG SlugRequestBody: type: object @@ -2432,6 +2465,13 @@ components: properties: slug: type: string + # Geography + Geography: + type: string + enum: + - auto + - us + - eu # SourceDefinition SourceDefinitionId: type: string @@ -3159,6 +3199,8 @@ components: sourceCatalogId: type: string format: uuid + geography: + $ref: "#/components/schemas/Geography" WebBackendConnectionCreate: type: object required: @@ -3206,6 +3248,8 @@ components: sourceCatalogId: type: string format: uuid + geography: + $ref: "#/components/schemas/Geography" ConnectionStateCreateOrUpdate: type: object required: @@ -3256,6 +3300,8 @@ components: sourceCatalogId: type: string format: uuid + geography: + $ref: "#/components/schemas/Geography" WebBackendConnectionUpdate: type: object description: Used to apply a patch-style update to a connection, which means that null properties remain unchanged @@ -3298,6 +3344,8 @@ components: sourceCatalogId: type: string format: uuid + geography: + $ref: "#/components/schemas/Geography" ConnectionRead: type: object required: @@ -3345,6 +3393,8 @@ components: sourceCatalogId: type: string format: uuid + geography: + $ref: "#/components/schemas/Geography" ConnectionSearch: type: object properties: @@ -4619,6 +4669,8 @@ components: format: uuid catalogDiff: $ref: "#/components/schemas/CatalogDiff" + geography: + $ref: "#/components/schemas/Geography" WebBackendConnectionReadList: type: object required: diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java index 5de66219c0b1..59402fea95b8 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java @@ -12,6 +12,7 @@ import io.airbyte.commons.version.Version; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.Geography; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.init.ApplyDefinitionsHelper; import io.airbyte.config.init.DefinitionsProvider; @@ -286,7 +287,8 @@ private static void createWorkspaceIfNoneExists(final ConfigRepository configRep .withSlug(workspaceId.toString()) .withInitialSetupComplete(false) .withDisplaySetupWizard(true) - .withTombstone(false); + .withTombstone(false) + .withDefaultGeography(Geography.AUTO); configRepository.writeStandardWorkspace(workspace); } diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index d284a6f02b86..37602c7c27a5 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -21,6 +21,7 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.commons.version.Version; import io.airbyte.config.Configs; +import io.airbyte.config.Geography; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.init.YamlSeedConfigPersistence; @@ -222,7 +223,8 @@ void testBootloaderAppRunSecretMigration() throws Exception { .withSlug("wSlug") .withEmail("email@mail.com") .withTombstone(false) - .withInitialSetupComplete(false)); + .withInitialSetupComplete(false) + .withDefaultGeography(Geography.AUTO)); final UUID sourceId = UUID.randomUUID(); configRepository.writeSourceConnectionNoSecrets(new SourceConnection() .withSourceDefinitionId(UUID.fromString("e7778cfc-e97c-4458-9ecb-b4f2bba8946c")) // Facebook Marketing diff --git a/airbyte-config/config-models/src/main/resources/types/Geography.yaml b/airbyte-config/config-models/src/main/resources/types/Geography.yaml new file mode 100644 index 000000000000..f545f53c1d24 --- /dev/null +++ b/airbyte-config/config-models/src/main/resources/types/Geography.yaml @@ -0,0 +1,10 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/Geography.yaml +title: Geography +description: Geography Setting +type: string +enum: + - auto + - us + - eu diff --git a/airbyte-config/config-models/src/main/resources/types/StandardSync.yaml b/airbyte-config/config-models/src/main/resources/types/StandardSync.yaml index 62755c70aff8..276428f8e112 100644 --- a/airbyte-config/config-models/src/main/resources/types/StandardSync.yaml +++ b/airbyte-config/config-models/src/main/resources/types/StandardSync.yaml @@ -11,6 +11,7 @@ required: - catalog - manual - namespaceDefinition + - geography additionalProperties: false properties: namespaceDefinition: @@ -114,3 +115,5 @@ properties: format: uuid resourceRequirements: "$ref": ResourceRequirements.yaml + geography: + "$ref": Geography.yaml diff --git a/airbyte-config/config-models/src/main/resources/types/StandardWorkspace.yaml b/airbyte-config/config-models/src/main/resources/types/StandardWorkspace.yaml index 44a44070ae6b..dd65857f7dbd 100644 --- a/airbyte-config/config-models/src/main/resources/types/StandardWorkspace.yaml +++ b/airbyte-config/config-models/src/main/resources/types/StandardWorkspace.yaml @@ -9,6 +9,7 @@ required: - name - slug - initialSetupComplete + - defaultGeography additionalProperties: false properties: workspaceId: @@ -47,3 +48,5 @@ properties: type: boolean feedbackDone: type: boolean + defaultGeography: + "$ref": Geography.yaml diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index 3332c32fe5b0..e512ae14f72d 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -1087,12 +1087,16 @@ private void writeStandardSync(final List configs, final DSLContex .set(CONNECTION.MANUAL, standardSync.getManual()) .set(CONNECTION.SCHEDULE_TYPE, standardSync.getScheduleType() == null ? null - : Enums.toEnum(standardSync.getScheduleType().value(), io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class) + : Enums.toEnum(standardSync.getScheduleType().value(), + io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class) .orElseThrow()) .set(CONNECTION.SCHEDULE_DATA, JSONB.valueOf(Jsons.serialize(standardSync.getScheduleData()))) - .set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements()))) + .set(CONNECTION.RESOURCE_REQUIREMENTS, + JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements()))) .set(CONNECTION.UPDATED_AT, timestamp) .set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId()) + .set(CONNECTION.GEOGRAPHY, Enums.toEnum(standardSync.getGeography().value(), + io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType.class).orElseThrow()) .where(CONNECTION.ID.eq(standardSync.getConnectionId())) .execute(); @@ -1126,11 +1130,15 @@ private void writeStandardSync(final List configs, final DSLContex .set(CONNECTION.MANUAL, standardSync.getManual()) .set(CONNECTION.SCHEDULE_TYPE, standardSync.getScheduleType() == null ? null - : Enums.toEnum(standardSync.getScheduleType().value(), io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class) + : Enums.toEnum(standardSync.getScheduleType().value(), + io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class) .orElseThrow()) .set(CONNECTION.SCHEDULE_DATA, JSONB.valueOf(Jsons.serialize(standardSync.getScheduleData()))) - .set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements()))) + .set(CONNECTION.RESOURCE_REQUIREMENTS, + JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements()))) .set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId()) + .set(CONNECTION.GEOGRAPHY, Enums.toEnum(standardSync.getGeography().value(), + io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType.class).orElseThrow()) .set(CONNECTION.CREATED_AT, timestamp) .set(CONNECTION.UPDATED_AT, timestamp) .execute(); diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java index f0179e982171..44805357c6d1 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java @@ -18,6 +18,7 @@ import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; +import io.airbyte.config.Geography; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Notification; import io.airbyte.config.ResourceRequirements; @@ -60,16 +61,20 @@ public static StandardSync buildStandardSync(final Record record, final List standardWorkspaces() { .withTombstone(false) .withNotifications(Collections.singletonList(notification)) .withFirstCompletedSync(true) - .withFeedbackDone(true); + .withFeedbackDone(true) + .withDefaultGeography(Geography.AUTO); final StandardWorkspace workspace2 = new StandardWorkspace() .withWorkspaceId(WORKSPACE_ID_2) .withName("Another Workspace") .withSlug("another-workspace") .withInitialSetupComplete(true) - .withTombstone(false); + .withTombstone(false) + .withDefaultGeography(Geography.AUTO); final StandardWorkspace workspace3 = new StandardWorkspace() .withWorkspaceId(WORKSPACE_ID_3) .withName("Tombstoned") .withSlug("tombstoned") .withInitialSetupComplete(true) - .withTombstone(true); + .withTombstone(true) + .withDefaultGeography(Geography.AUTO); return Arrays.asList(workspace1, workspace2, workspace3); } @@ -448,7 +452,8 @@ public static List standardSyncs() { .withPrefix("") .withResourceRequirements(resourceRequirements) .withStatus(Status.ACTIVE) - .withSchedule(schedule); + .withSchedule(schedule) + .withGeography(Geography.AUTO); final StandardSync standardSync2 = new StandardSync() .withOperationIds(Arrays.asList(OPERATION_ID_1, OPERATION_ID_2)) @@ -463,7 +468,8 @@ public static List standardSyncs() { .withPrefix("") .withResourceRequirements(resourceRequirements) .withStatus(Status.ACTIVE) - .withSchedule(schedule); + .withSchedule(schedule) + .withGeography(Geography.AUTO); final StandardSync standardSync3 = new StandardSync() .withOperationIds(Arrays.asList(OPERATION_ID_1, OPERATION_ID_2)) @@ -478,7 +484,8 @@ public static List standardSyncs() { .withPrefix("") .withResourceRequirements(resourceRequirements) .withStatus(Status.ACTIVE) - .withSchedule(schedule); + .withSchedule(schedule) + .withGeography(Geography.AUTO); final StandardSync standardSync4 = new StandardSync() .withOperationIds(Collections.emptyList()) @@ -493,7 +500,8 @@ public static List standardSyncs() { .withPrefix("") .withResourceRequirements(resourceRequirements) .withStatus(Status.DEPRECATED) - .withSchedule(schedule); + .withSchedule(schedule) + .withGeography(Geography.AUTO); final StandardSync standardSync5 = new StandardSync() .withOperationIds(Arrays.asList(OPERATION_ID_3)) @@ -508,7 +516,8 @@ public static List standardSyncs() { .withPrefix("") .withResourceRequirements(resourceRequirements) .withStatus(Status.ACTIVE) - .withSchedule(schedule); + .withSchedule(schedule) + .withGeography(Geography.AUTO); final StandardSync standardSync6 = new StandardSync() .withOperationIds(Arrays.asList()) @@ -523,7 +532,8 @@ public static List standardSyncs() { .withPrefix("") .withResourceRequirements(resourceRequirements) .withStatus(Status.DEPRECATED) - .withSchedule(schedule); + .withSchedule(schedule) + .withGeography(Geography.AUTO); return Arrays.asList(standardSync1, standardSync2, standardSync3, standardSync4, standardSync5, standardSync6); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index ba39d020aaaa..92236e6bc751 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -90,6 +90,7 @@ import io.airbyte.api.model.generated.WebBackendConnectionReadList; import io.airbyte.api.model.generated.WebBackendConnectionRequestBody; import io.airbyte.api.model.generated.WebBackendConnectionUpdate; +import io.airbyte.api.model.generated.WebBackendGeographiesListResult; import io.airbyte.api.model.generated.WebBackendWorkspaceState; import io.airbyte.api.model.generated.WebBackendWorkspaceStateResult; import io.airbyte.api.model.generated.WorkspaceCreate; @@ -128,6 +129,7 @@ import io.airbyte.server.handlers.SourceHandler; import io.airbyte.server.handlers.StateHandler; import io.airbyte.server.handlers.WebBackendConnectionsHandler; +import io.airbyte.server.handlers.WebBackendGeographiesHandler; import io.airbyte.server.handlers.WorkspacesHandler; import io.airbyte.server.scheduler.EventRunner; import io.airbyte.server.scheduler.SynchronousSchedulerClient; @@ -156,6 +158,7 @@ public class ConfigurationApi implements io.airbyte.api.generated.V1Api { private final StateHandler stateHandler; private final JobHistoryHandler jobHistoryHandler; private final WebBackendConnectionsHandler webBackendConnectionsHandler; + private final WebBackendGeographiesHandler webBackendGeographiesHandler; private final HealthCheckHandler healthCheckHandler; private final LogsHandler logsHandler; private final OpenApiConfigHandler openApiConfigHandler; @@ -236,6 +239,7 @@ public ConfigurationApi(final ConfigRepository configRepository, operationsHandler, eventRunner, configRepository); + webBackendGeographiesHandler = new WebBackendGeographiesHandler(); healthCheckHandler = new HealthCheckHandler(configRepository); logsHandler = new LogsHandler(); openApiConfigHandler = new OpenApiConfigHandler(); @@ -796,6 +800,11 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final return execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody)); } + @Override + public WebBackendGeographiesListResult webBackendListGeographies() { + return execute(webBackendGeographiesHandler::listGeographiesOSS); + } + @Override public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnectionRequestBody webBackendConnectionRequestBody) { return execute(() -> webBackendConnectionsHandler.webBackendGetConnection(webBackendConnectionRequestBody)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java b/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java index 73d9e0f61220..e3f6b3d81282 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java @@ -11,6 +11,7 @@ import io.airbyte.api.model.generated.ConnectionScheduleDataBasicSchedule; import io.airbyte.api.model.generated.ConnectionScheduleDataCron; import io.airbyte.api.model.generated.ConnectionStatus; +import io.airbyte.api.model.generated.Geography; import io.airbyte.api.model.generated.JobType; import io.airbyte.api.model.generated.JobTypeResourceLimit; import io.airbyte.api.model.generated.ResourceRequirements; @@ -91,7 +92,8 @@ public static ConnectionRead internalToConnectionRead(final StandardSync standar .namespaceFormat(standardSync.getNamespaceFormat()) .prefix(standardSync.getPrefix()) .syncCatalog(CatalogConverter.toApi(standardSync.getCatalog())) - .sourceCatalogId(standardSync.getSourceCatalogId()); + .sourceCatalogId(standardSync.getSourceCatalogId()) + .geography(Enums.convertTo(standardSync.getGeography(), Geography.class)); if (standardSync.getResourceRequirements() != null) { connectionRead.resourceRequirements(resourceRequirementsToApi(standardSync.getResourceRequirements())); @@ -127,6 +129,14 @@ public static StandardSync.Status toPersistenceStatus(final ConnectionStatus api return Enums.convertTo(apiStatus, StandardSync.Status.class); } + public static Geography toApiGeography(final io.airbyte.config.Geography geography) { + return Enums.convertTo(geography, Geography.class); + } + + public static io.airbyte.config.Geography toPersistenceGeography(final Geography apiGeography) { + return Enums.convertTo(apiGeography, io.airbyte.config.Geography.class); + } + public static Schedule.TimeUnit toPersistenceTimeUnit(final ConnectionSchedule.TimeUnitEnum apiTimeUnit) { return Enums.convertTo(apiTimeUnit, Schedule.TimeUnit.class); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index c14b8874c69e..b53c40949870 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -29,6 +29,7 @@ import io.airbyte.config.ActorCatalog; import io.airbyte.config.BasicSchedule; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.Geography; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Schedule; import io.airbyte.config.ScheduleData; @@ -37,6 +38,7 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSync.ScheduleType; +import io.airbyte.config.StandardWorkspace; import io.airbyte.config.helpers.ScheduleHelpers; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; @@ -138,7 +140,8 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate) .withDestinationId(connectionCreate.getDestinationId()) .withOperationIds(operationIds) .withStatus(ApiPojoConverters.toPersistenceStatus(connectionCreate.getStatus())) - .withSourceCatalogId(connectionCreate.getSourceCatalogId()); + .withSourceCatalogId(connectionCreate.getSourceCatalogId()) + .withGeography(getGeographyFromConnectionCreateOrWorkspace(connectionCreate)); if (connectionCreate.getResourceRequirements() != null) { standardSync.withResourceRequirements(ApiPojoConverters.resourceRequirementsToInternal(connectionCreate.getResourceRequirements())); } @@ -177,6 +180,25 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate) return buildConnectionRead(connectionId); } + private Geography getGeographyFromConnectionCreateOrWorkspace(final ConnectionCreate connectionCreate) + throws JsonValidationException, ConfigNotFoundException, IOException { + + if (connectionCreate.getGeography() != null) { + return ApiPojoConverters.toPersistenceGeography(connectionCreate.getGeography()); + } + + // connectionCreate didn't specify a geography, so use the workspace default geography if one exists + final UUID workspaceId = workspaceHelper.getWorkspaceForSourceId(connectionCreate.getSourceId()); + final StandardWorkspace workspace = configRepository.getStandardWorkspace(workspaceId, true); + + if (workspace.getDefaultGeography() != null) { + return workspace.getDefaultGeography(); + } + + // if the workspace doesn't have a default geography, default to 'auto' + return Geography.AUTO; + } + private void populateSyncFromLegacySchedule(final StandardSync standardSync, final ConnectionCreate connectionCreate) { if (connectionCreate.getSchedule() != null) { final Schedule schedule = new Schedule() diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 5a34ab913f1a..6a89d1eaa145 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -247,7 +247,8 @@ private static WebBackendConnectionRead getWebBackendConnectionRead(final Connec .source(source) .destination(destination) .operations(operations.getOperations()) - .resourceRequirements(connectionRead.getResourceRequirements()); + .resourceRequirements(connectionRead.getResourceRequirements()) + .geography(connectionRead.getGeography()); } // todo (cgardens) - This logic is a headache to follow it stems from the internal data model not @@ -568,6 +569,7 @@ protected static ConnectionCreate toConnectionCreate(final WebBackendConnectionC connectionCreate.status(webBackendConnectionCreate.getStatus()); connectionCreate.resourceRequirements(webBackendConnectionCreate.getResourceRequirements()); connectionCreate.sourceCatalogId(webBackendConnectionCreate.getSourceCatalogId()); + connectionCreate.geography(webBackendConnectionCreate.getGeography()); return connectionCreate; } @@ -597,6 +599,7 @@ protected static ConnectionUpdate toConnectionPatch(final WebBackendConnectionUp connectionPatch.status(webBackendConnectionPatch.getStatus()); connectionPatch.resourceRequirements(webBackendConnectionPatch.getResourceRequirements()); connectionPatch.sourceCatalogId(webBackendConnectionPatch.getSourceCatalogId()); + connectionPatch.geography(webBackendConnectionPatch.getGeography()); connectionPatch.operationIds(finalOperationIds); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendGeographiesHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendGeographiesHandler.java new file mode 100644 index 000000000000..d8493a744d35 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendGeographiesHandler.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.handlers; + +import io.airbyte.api.model.generated.Geography; +import io.airbyte.api.model.generated.WebBackendGeographiesListResult; +import java.util.Arrays; +import java.util.Collections; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@AllArgsConstructor +@Slf4j +public class WebBackendGeographiesHandler { + + public WebBackendGeographiesListResult listGeographiesOSS() { + // for now, OSS only supports AUTO. This can evolve to account for complex OSS use cases, but for + // now we expect OSS deployments to use a single default Task Queue for scheduling syncs in a vast + // majority of cases. + return new WebBackendGeographiesListResult().geographies( + Collections.singletonList(Geography.AUTO)); + } + + /** + * Only called by the wrapped Cloud API to enable multi-cloud + */ + public WebBackendGeographiesListResult listGeographiesCloud() { + return new WebBackendGeographiesListResult().geographies(Arrays.asList(Geography.values())); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WorkspacesHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WorkspacesHandler.java index 6a9a5fe09493..47605c5200e5 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WorkspacesHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WorkspacesHandler.java @@ -10,6 +10,7 @@ import io.airbyte.analytics.TrackingClientSingleton; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationRead; +import io.airbyte.api.model.generated.Geography; import io.airbyte.api.model.generated.Notification; import io.airbyte.api.model.generated.NotificationRead; import io.airbyte.api.model.generated.NotificationRead.StatusEnum; @@ -22,6 +23,7 @@ import io.airbyte.api.model.generated.WorkspaceReadList; import io.airbyte.api.model.generated.WorkspaceUpdate; import io.airbyte.api.model.generated.WorkspaceUpdateName; +import io.airbyte.commons.enums.Enums; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; @@ -79,6 +81,11 @@ public WorkspaceRead createWorkspace(final WorkspaceCreate workspaceCreate) final Boolean securityUpdates = workspaceCreate.getSecurityUpdates(); final Boolean displaySetupWizard = workspaceCreate.getDisplaySetupWizard(); + // if not set on the workspaceCreate, set the defaultGeography to AUTO + final io.airbyte.config.Geography defaultGeography = workspaceCreate.getDefaultGeography() != null + ? Enums.convertTo(workspaceCreate.getDefaultGeography(), io.airbyte.config.Geography.class) + : io.airbyte.config.Geography.AUTO; + final StandardWorkspace workspace = new StandardWorkspace() .withWorkspaceId(uuidSupplier.get()) .withCustomerId(uuidSupplier.get()) @@ -90,7 +97,8 @@ public WorkspaceRead createWorkspace(final WorkspaceCreate workspaceCreate) .withSecurityUpdates(securityUpdates != null ? securityUpdates : false) .withDisplaySetupWizard(displaySetupWizard != null ? displaySetupWizard : false) .withTombstone(false) - .withNotifications(NotificationConverter.toConfigList(workspaceCreate.getNotifications())); + .withNotifications(NotificationConverter.toConfigList(workspaceCreate.getNotifications())) + .withDefaultGeography(defaultGeography); if (!Strings.isNullOrEmpty(email)) { workspace.withEmail(email); @@ -249,7 +257,8 @@ private static WorkspaceRead buildWorkspaceRead(final StandardWorkspace workspac .anonymousDataCollection(workspace.getAnonymousDataCollection()) .news(workspace.getNews()) .securityUpdates(workspace.getSecurityUpdates()) - .notifications(NotificationConverter.toApiList(workspace.getNotifications())); + .notifications(NotificationConverter.toApiList(workspace.getNotifications())) + .defaultGeography(Enums.convertTo(workspace.getDefaultGeography(), Geography.class)); } private void validateWorkspacePatch(final StandardWorkspace persistedWorkspace, final WorkspaceUpdate workspacePatch) { @@ -278,6 +287,10 @@ private void applyPatchToStandardWorkspace(final StandardWorkspace workspace, fi if (workspacePatch.getNotifications() != null) { workspace.setNotifications(NotificationConverter.toConfigList(workspacePatch.getNotifications())); } + if (workspacePatch.getDefaultGeography() != null) { + workspace.setDefaultGeography( + Enums.convertTo(workspacePatch.getDefaultGeography(), io.airbyte.config.Geography.class)); + } } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/ConnectionMatcher.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/ConnectionMatcher.java index 163132502cda..c98f628c12ef 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/ConnectionMatcher.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/ConnectionMatcher.java @@ -43,6 +43,7 @@ public ConnectionRead match(final ConnectionRead query) { fromSearch.syncCatalog(query.getSyncCatalog()); fromSearch.operationIds(query.getOperationIds()); fromSearch.sourceCatalogId(query.getSourceCatalogId()); + fromSearch.geography(query.getGeography()); return fromSearch; } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index 02d6fabbe53d..5bd3d2648caf 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -45,6 +45,7 @@ import io.airbyte.config.Cron; import io.airbyte.config.DataType; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.Geography; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.Schedule; import io.airbyte.config.Schedule.TimeUnit; @@ -55,6 +56,7 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSync.ScheduleType; import io.airbyte.config.StandardSync.Status; +import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.WorkspaceHelper; @@ -147,7 +149,8 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio .withScheduleType(ScheduleType.BASIC_SCHEDULE) .withScheduleData(ConnectionHelpers.generateBasicScheduleData()) .withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS) - .withSourceCatalogId(UUID.randomUUID()); + .withSourceCatalogId(UUID.randomUUID()) + .withGeography(Geography.AUTO); standardSyncDeleted = new StandardSync() .withConnectionId(connectionId) .withName("presto to hudi2") @@ -161,7 +164,8 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio .withOperationIds(List.of(operationId)) .withManual(false) .withSchedule(ConnectionHelpers.generateBasicSchedule()) - .withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS); + .withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS) + .withGeography(Geography.US); configRepository = mock(ConfigRepository.class); uuidGenerator = mock(Supplier.class); @@ -179,13 +183,30 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio class UnMockedConnectionHelper { @BeforeEach - void setUp() { + void setUp() throws JsonValidationException, ConfigNotFoundException, IOException { connectionsHandler = new ConnectionsHandler( configRepository, uuidGenerator, workspaceHelper, trackingClient, eventRunner); + + when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId()); + final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() + .withName(SOURCE_TEST) + .withSourceDefinitionId(UUID.randomUUID()); + final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition() + .withName(DESTINATION_TEST) + .withDestinationDefinitionId(UUID.randomUUID()); + when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync); + when(configRepository.getSourceDefinitionFromConnection(standardSync.getConnectionId())).thenReturn( + sourceDefinition); + when(configRepository.getDestinationDefinitionFromConnection(standardSync.getConnectionId())).thenReturn( + destinationDefinition); + when(configRepository.getSourceConnection(source.getSourceId())) + .thenReturn(source); + when(configRepository.getDestinationConnection(destination.getDestinationId())) + .thenReturn(destination); } @Nested @@ -193,23 +214,17 @@ class CreateConnection { @Test void testCreateConnection() throws JsonValidationException, ConfigNotFoundException, IOException { - when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId()); - final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() - .withName(SOURCE_TEST) - .withSourceDefinitionId(UUID.randomUUID()); - final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition() - .withName(DESTINATION_TEST) - .withDestinationDefinitionId(UUID.randomUUID()); - when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync); - when(configRepository.getSourceDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(sourceDefinition); - when(configRepository.getDestinationDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(destinationDefinition); - when(configRepository.getSourceConnection(source.getSourceId())) - .thenReturn(source); - when(configRepository.getDestinationConnection(destination.getDestinationId())) - .thenReturn(destination); final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog(); + // set a defaultGeography on the workspace as EU, but expect connection to be + // created AUTO because the ConnectionCreate geography takes precedence over the workspace + // defaultGeography. + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withDefaultGeography(Geography.EU); + when(configRepository.getStandardWorkspace(workspaceId, true)).thenReturn(workspace); + final ConnectionCreate connectionCreate = new ConnectionCreate() .sourceId(standardSync.getSourceId()) .destinationId(standardSync.getDestinationId()) @@ -226,7 +241,8 @@ void testCreateConnection() throws JsonValidationException, ConfigNotFoundExcept .cpuLimit(standardSync.getResourceRequirements().getCpuLimit()) .memoryRequest(standardSync.getResourceRequirements().getMemoryRequest()) .memoryLimit(standardSync.getResourceRequirements().getMemoryLimit())) - .sourceCatalogId(standardSync.getSourceCatalogId()); + .sourceCatalogId(standardSync.getSourceCatalogId()) + .geography(ApiPojoConverters.toApiGeography(standardSync.getGeography())); final ConnectionRead actualConnectionRead = connectionsHandler.createConnection(connectionCreate); @@ -245,13 +261,52 @@ void testCreateConnection() throws JsonValidationException, ConfigNotFoundExcept } @Test - void testValidateConnectionCreateSourceAndDestinationInDifferenceWorkspace() - throws JsonValidationException, ConfigNotFoundException, IOException { + void testCreateConnectionUsesDefaultGeographyFromWorkspace() throws JsonValidationException, ConfigNotFoundException, IOException { + + when(workspaceHelper.getWorkspaceForSourceId(sourceId)).thenReturn(workspaceId); + + final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog(); + + // don't set a geography on the ConnectionCreate to force inheritance from workspace default + final ConnectionCreate connectionCreate = new ConnectionCreate() + .sourceId(standardSync.getSourceId()) + .destinationId(standardSync.getDestinationId()) + .operationIds(standardSync.getOperationIds()) + .name(PRESTO_TO_HUDI) + .namespaceDefinition(NamespaceDefinitionType.SOURCE) + .namespaceFormat(null) + .prefix(PRESTO_TO_HUDI_PREFIX) + .status(ConnectionStatus.ACTIVE) + .schedule(ConnectionHelpers.generateBasicConnectionSchedule()) + .syncCatalog(catalog) + .resourceRequirements(new io.airbyte.api.model.generated.ResourceRequirements() + .cpuRequest(standardSync.getResourceRequirements().getCpuRequest()) + .cpuLimit(standardSync.getResourceRequirements().getCpuLimit()) + .memoryRequest(standardSync.getResourceRequirements().getMemoryRequest()) + .memoryLimit(standardSync.getResourceRequirements().getMemoryLimit())) + .sourceCatalogId(standardSync.getSourceCatalogId()); + + // set the workspace default to EU + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withDefaultGeography(Geography.EU); + when(configRepository.getStandardWorkspace(workspaceId, true)).thenReturn(workspace); + + // the expected read and verified write is generated from the standardSync, so set this to EU as + // well + standardSync.setGeography(Geography.EU); + + final ConnectionRead expectedConnectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); + final ConnectionRead actualConnectionRead = connectionsHandler.createConnection(connectionCreate); + + assertEquals(expectedConnectionRead, actualConnectionRead); + verify(configRepository).writeStandardSync(standardSync); + } + + @Test + void testValidateConnectionCreateSourceAndDestinationInDifferenceWorkspace() { + when(workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(destinationId)).thenReturn(UUID.randomUUID()); - when(configRepository.getSourceConnection(source.getSourceId())) - .thenReturn(source); - when(configRepository.getDestinationConnection(destination.getDestinationId())) - .thenReturn(destination); final ConnectionCreate connectionCreate = new ConnectionCreate() .sourceId(standardSync.getSourceId()) @@ -261,12 +316,9 @@ void testValidateConnectionCreateSourceAndDestinationInDifferenceWorkspace() } @Test - void testValidateConnectionCreateOperationInDifferentWorkspace() throws JsonValidationException, ConfigNotFoundException, IOException { + void testValidateConnectionCreateOperationInDifferentWorkspace() { + when(workspaceHelper.getWorkspaceForOperationIdIgnoreExceptions(operationId)).thenReturn(UUID.randomUUID()); - when(configRepository.getSourceConnection(source.getSourceId())) - .thenReturn(source); - when(configRepository.getDestinationConnection(destination.getDestinationId())) - .thenReturn(destination); final ConnectionCreate connectionCreate = new ConnectionCreate() .sourceId(standardSync.getSourceId()) @@ -278,20 +330,10 @@ void testValidateConnectionCreateOperationInDifferentWorkspace() throws JsonVali @Test void testCreateConnectionWithBadDefinitionIds() throws JsonValidationException, ConfigNotFoundException, IOException { - when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId()); + final UUID sourceIdBad = UUID.randomUUID(); final UUID destinationIdBad = UUID.randomUUID(); - final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() - .withName(SOURCE_TEST) - .withSourceDefinitionId(UUID.randomUUID()); - final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition() - .withName(DESTINATION_TEST) - .withDestinationDefinitionId(UUID.randomUUID()); - when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync); - when(configRepository.getSourceDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(sourceDefinition); - when(configRepository.getDestinationDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(destinationDefinition); - when(configRepository.getSourceConnection(sourceIdBad)) .thenThrow(new ConfigNotFoundException(ConfigSchema.SOURCE_CONNECTION, sourceIdBad)); when(configRepository.getDestinationConnection(destinationIdBad)) @@ -345,7 +387,8 @@ void testUpdateConnectionPatchSingleField() throws Exception { standardSync.getSourceId(), standardSync.getDestinationId(), standardSync.getOperationIds(), - standardSync.getSourceCatalogId()) + standardSync.getSourceCatalogId(), + ApiPojoConverters.toApiGeography(standardSync.getGeography())) .name("newName"); final StandardSync expectedPersistedSync = Jsons.clone(standardSync).withName("newName"); @@ -369,7 +412,8 @@ void testUpdateConnectionPatchScheduleToManual() throws Exception { standardSync.getSourceId(), standardSync.getDestinationId(), standardSync.getOperationIds(), - standardSync.getSourceCatalogId()) + standardSync.getSourceCatalogId(), + ApiPojoConverters.toApiGeography(standardSync.getGeography())) .schedule(null) .scheduleType(ConnectionScheduleType.MANUAL) .scheduleData(null); @@ -405,7 +449,8 @@ void testUpdateConnectionPatchScheduleToCron() throws Exception { standardSync.getSourceId(), standardSync.getDestinationId(), standardSync.getOperationIds(), - standardSync.getSourceCatalogId()) + standardSync.getSourceCatalogId(), + ApiPojoConverters.toApiGeography(standardSync.getGeography())) .schedule(null) .scheduleType(ConnectionScheduleType.CRON) .scheduleData(cronScheduleData); @@ -441,7 +486,8 @@ void testUpdateConnectionPatchBasicSchedule() throws Exception { standardSync.getSourceId(), standardSync.getDestinationId(), standardSync.getOperationIds(), - standardSync.getSourceCatalogId()) + standardSync.getSourceCatalogId(), + ApiPojoConverters.toApiGeography(standardSync.getGeography())) .schedule(new ConnectionSchedule().timeUnit(ConnectionSchedule.TimeUnitEnum.DAYS).units(10L)) // still dual-writing to legacy field .scheduleType(ConnectionScheduleType.BASIC) .scheduleData(newScheduleData); @@ -490,7 +536,8 @@ void testUpdateConnectionPatchAddingNewStream() throws Exception { standardSync.getSourceId(), standardSync.getDestinationId(), standardSync.getOperationIds(), - standardSync.getSourceCatalogId()) + standardSync.getSourceCatalogId(), + ApiPojoConverters.toApiGeography(standardSync.getGeography())) .syncCatalog(catalogForUpdate); final StandardSync expectedPersistedSync = Jsons.clone(standardSync) @@ -532,7 +579,8 @@ void testUpdateConnectionPatchEditExistingStreamWhileAddingNewStream() throws Ex standardSync.getSourceId(), standardSync.getDestinationId(), standardSync.getOperationIds(), - standardSync.getSourceCatalogId()) + standardSync.getSourceCatalogId(), + ApiPojoConverters.toApiGeography(standardSync.getGeography())) .syncCatalog(catalogForUpdate); final StandardSync expectedPersistedSync = Jsons.clone(standardSync) @@ -602,7 +650,8 @@ void testUpdateConnectionPatchingSeveralFieldsAndReplaceAStream() throws JsonVal standardSync.getSourceId(), standardSync.getDestinationId(), standardSync.getOperationIds(), - newSourceCatalogId) + newSourceCatalogId, + ApiPojoConverters.toApiGeography(standardSync.getGeography())) .status(ConnectionStatus.INACTIVE) .scheduleType(ConnectionScheduleType.MANUAL) .scheduleData(null) @@ -695,7 +744,8 @@ void testSearchConnections() throws JsonValidationException, ConfigNotFoundExcep .withDestinationId(destinationId) .withOperationIds(List.of(operationId)) .withManual(true) - .withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS); + .withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS) + .withGeography(Geography.US); final ConnectionRead connectionRead2 = ConnectionHelpers.connectionReadFromStandardSync(standardSync2); final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() .withName(SOURCE_TEST) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 49b4f2d09c47..2c02bb38d679 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -36,6 +36,7 @@ import io.airbyte.api.model.generated.DestinationIdRequestBody; import io.airbyte.api.model.generated.DestinationRead; import io.airbyte.api.model.generated.DestinationSyncMode; +import io.airbyte.api.model.generated.Geography; import io.airbyte.api.model.generated.JobConfigType; import io.airbyte.api.model.generated.JobInfoRead; import io.airbyte.api.model.generated.JobRead; @@ -403,7 +404,8 @@ void testToConnectionCreate() throws IOException { .status(ConnectionStatus.INACTIVE) .schedule(schedule) .syncCatalog(catalog) - .sourceCatalogId(sourceCatalogId); + .sourceCatalogId(sourceCatalogId) + .geography(Geography.US); final List operationIds = List.of(newOperationId); @@ -418,7 +420,8 @@ void testToConnectionCreate() throws IOException { .status(ConnectionStatus.INACTIVE) .schedule(schedule) .syncCatalog(catalog) - .sourceCatalogId(sourceCatalogId); + .sourceCatalogId(sourceCatalogId) + .geography(Geography.US); final ConnectionCreate actual = WebBackendConnectionsHandler.toConnectionCreate(input, operationIds); @@ -426,7 +429,7 @@ void testToConnectionCreate() throws IOException { } @Test - void testToConnectionUpdate() throws IOException { + void testToConnectionPatch() throws IOException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final StandardSync standardSync = ConnectionHelpers.generateSyncWithSourceId(source.getSourceId()); @@ -445,7 +448,8 @@ void testToConnectionUpdate() throws IOException { .status(ConnectionStatus.INACTIVE) .schedule(schedule) .name(standardSync.getName()) - .syncCatalog(catalog); + .syncCatalog(catalog) + .geography(Geography.US); final List operationIds = List.of(newOperationId); @@ -458,7 +462,8 @@ void testToConnectionUpdate() throws IOException { .status(ConnectionStatus.INACTIVE) .schedule(schedule) .name(standardSync.getName()) - .syncCatalog(catalog); + .syncCatalog(catalog) + .geography(Geography.US); final ConnectionUpdate actual = WebBackendConnectionsHandler.toConnectionPatch(input, operationIds); @@ -468,8 +473,9 @@ void testToConnectionUpdate() throws IOException { @Test void testForConnectionCreateCompleteness() { final Set handledMethods = - Set.of("name", "namespaceDefinition", "namespaceFormat", "prefix", "sourceId", "destinationId", "operationIds", "addOperationIdsItem", - "removeOperationIdsItem", "syncCatalog", "schedule", "scheduleType", "scheduleData", "status", "resourceRequirements", "sourceCatalogId"); + Set.of("name", "namespaceDefinition", "namespaceFormat", "prefix", "sourceId", "destinationId", "operationIds", + "addOperationIdsItem", "removeOperationIdsItem", "syncCatalog", "schedule", "scheduleType", "scheduleData", + "status", "resourceRequirements", "sourceCatalogId", "geography"); final Set methods = Arrays.stream(ConnectionCreate.class.getMethods()) .filter(method -> method.getReturnType() == ConnectionCreate.class) @@ -487,10 +493,11 @@ void testForConnectionCreateCompleteness() { } @Test - void testForConnectionUpdateCompleteness() { + void testForConnectionPatchCompleteness() { final Set handledMethods = - Set.of("schedule", "connectionId", "syncCatalog", "namespaceDefinition", "namespaceFormat", "prefix", "status", "operationIds", - "addOperationIdsItem", "removeOperationIdsItem", "resourceRequirements", "name", "sourceCatalogId", "scheduleType", "scheduleData"); + Set.of("schedule", "connectionId", "syncCatalog", "namespaceDefinition", "namespaceFormat", "prefix", "status", + "operationIds", "addOperationIdsItem", "removeOperationIdsItem", "resourceRequirements", "name", + "sourceCatalogId", "scheduleType", "scheduleData", "geography"); final Set methods = Arrays.stream(ConnectionUpdate.class.getMethods()) .filter(method -> method.getReturnType() == ConnectionUpdate.class) @@ -501,8 +508,8 @@ void testForConnectionUpdateCompleteness() { """ If this test is failing, it means you added a field to ConnectionUpdate! Congratulations, but you're not done yet.. - \tYou should update WebBackendConnectionsHandler::toConnectionUpdate - \tand ensure that the field is tested in WebBackendConnectionsHandlerTest::testToConnectionUpdate + \tYou should update WebBackendConnectionsHandler::toConnectionPatch + \tand ensure that the field is tested in WebBackendConnectionsHandlerTest::testToConnectionPatch Then you can add the field name here to make this test pass. Cheers!"""; assertEquals(handledMethods, methods, message); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendGeographiesHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendGeographiesHandlerTest.java new file mode 100644 index 000000000000..e4f014aa3c1d --- /dev/null +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendGeographiesHandlerTest.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.handlers; + +import io.airbyte.api.model.generated.Geography; +import io.airbyte.api.model.generated.WebBackendGeographiesListResult; +import java.util.List; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class WebBackendGeographiesHandlerTest { + + private WebBackendGeographiesHandler webBackendGeographiesHandler; + + @BeforeEach + void setUp() { + webBackendGeographiesHandler = new WebBackendGeographiesHandler(); + } + + @Test + void testListGeographiesOSS() { + final WebBackendGeographiesListResult expected = new WebBackendGeographiesListResult().geographies( + List.of(Geography.AUTO)); + + final WebBackendGeographiesListResult actual = webBackendGeographiesHandler.listGeographiesOSS(); + + Assertions.assertEquals(expected, actual); + } + + @Test + void testListGeographiesCloud() { + final WebBackendGeographiesListResult expected = new WebBackendGeographiesListResult().geographies( + List.of(Geography.AUTO, Geography.US, Geography.EU)); + + final WebBackendGeographiesListResult actual = webBackendGeographiesHandler.listGeographiesCloud(); + + Assertions.assertEquals(expected, actual); + } + +} diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WorkspacesHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WorkspacesHandlerTest.java index 6f6c11d0a7dc..31879030da0d 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WorkspacesHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WorkspacesHandlerTest.java @@ -30,6 +30,7 @@ import io.airbyte.api.model.generated.WorkspaceUpdate; import io.airbyte.api.model.generated.WorkspaceUpdateName; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.Geography; import io.airbyte.config.Notification; import io.airbyte.config.Notification.NotificationType; import io.airbyte.config.SlackNotificationConfiguration; @@ -64,6 +65,11 @@ class WorkspacesHandlerTest { private static final String TEST_WORKSPACE_NAME = "test workspace"; private static final String TEST_WORKSPACE_SLUG = "test-workspace"; + private static final io.airbyte.api.model.generated.Geography GEOGRAPHY_AUTO = + io.airbyte.api.model.generated.Geography.AUTO; + private static final io.airbyte.api.model.generated.Geography GEOGRAPHY_US = + io.airbyte.api.model.generated.Geography.US; + @SuppressWarnings("unchecked") @BeforeEach void setUp() { @@ -89,7 +95,8 @@ private StandardWorkspace generateWorkspace() { .withAnonymousDataCollection(false) .withSecurityUpdates(false) .withTombstone(false) - .withNotifications(List.of(generateNotification())); + .withNotifications(List.of(generateNotification())) + .withDefaultGeography(Geography.AUTO); } private Notification generateNotification() { @@ -121,7 +128,8 @@ void testCreateWorkspace() throws JsonValidationException, IOException { .news(false) .anonymousDataCollection(false) .securityUpdates(false) - .notifications(List.of(generateApiNotification())); + .notifications(List.of(generateApiNotification())) + .defaultGeography(GEOGRAPHY_US); final WorkspaceRead actualRead = workspacesHandler.createWorkspace(workspaceCreate); final WorkspaceRead expectedRead = new WorkspaceRead() @@ -135,7 +143,8 @@ void testCreateWorkspace() throws JsonValidationException, IOException { .news(false) .anonymousDataCollection(false) .securityUpdates(false) - .notifications(List.of(generateApiNotification())); + .notifications(List.of(generateApiNotification())) + .defaultGeography(GEOGRAPHY_US); assertEquals(expectedRead, actualRead); } @@ -172,7 +181,8 @@ void testCreateWorkspaceDuplicateSlug() throws JsonValidationException, IOExcept .news(false) .anonymousDataCollection(false) .securityUpdates(false) - .notifications(Collections.emptyList()); + .notifications(Collections.emptyList()) + .defaultGeography(GEOGRAPHY_AUTO); assertTrue(actualRead.getSlug().startsWith(workspace.getSlug())); assertNotEquals(workspace.getSlug(), actualRead.getSlug()); @@ -231,7 +241,8 @@ void testListWorkspaces() throws JsonValidationException, IOException { .news(workspace.getNews()) .anonymousDataCollection(workspace.getAnonymousDataCollection()) .securityUpdates(workspace.getSecurityUpdates()) - .notifications(List.of(generateApiNotification())); + .notifications(List.of(generateApiNotification())) + .defaultGeography(GEOGRAPHY_AUTO); final WorkspaceRead expectedWorkspaceRead2 = new WorkspaceRead() .workspaceId(workspace2.getWorkspaceId()) @@ -244,7 +255,8 @@ void testListWorkspaces() throws JsonValidationException, IOException { .news(workspace2.getNews()) .anonymousDataCollection(workspace2.getAnonymousDataCollection()) .securityUpdates(workspace2.getSecurityUpdates()) - .notifications(List.of(generateApiNotification())); + .notifications(List.of(generateApiNotification())) + .defaultGeography(GEOGRAPHY_AUTO); final WorkspaceReadList actualWorkspaceReadList = workspacesHandler.listWorkspaces(); @@ -269,7 +281,8 @@ void testGetWorkspace() throws JsonValidationException, ConfigNotFoundException, .news(false) .anonymousDataCollection(false) .securityUpdates(false) - .notifications(List.of(generateApiNotification())); + .notifications(List.of(generateApiNotification())) + .defaultGeography(GEOGRAPHY_AUTO); assertEquals(workspaceRead, workspacesHandler.getWorkspace(workspaceIdRequestBody)); } @@ -290,7 +303,8 @@ void testGetWorkspaceBySlug() throws JsonValidationException, ConfigNotFoundExce .news(workspace.getNews()) .anonymousDataCollection(workspace.getAnonymousDataCollection()) .securityUpdates(workspace.getSecurityUpdates()) - .notifications(NotificationConverter.toApiList(workspace.getNotifications())); + .notifications(NotificationConverter.toApiList(workspace.getNotifications())) + .defaultGeography(GEOGRAPHY_AUTO); assertEquals(workspaceRead, workspacesHandler.getWorkspaceBySlug(slugRequestBody)); } @@ -306,7 +320,8 @@ void testUpdateWorkspace() throws JsonValidationException, ConfigNotFoundExcepti .news(false) .initialSetupComplete(true) .displaySetupWizard(false) - .notifications(List.of(apiNotification)); + .notifications(List.of(apiNotification)) + .defaultGeography(GEOGRAPHY_US); final Notification expectedNotification = generateNotification(); expectedNotification.getSlackConfiguration().withWebhook("updated"); @@ -322,7 +337,8 @@ void testUpdateWorkspace() throws JsonValidationException, ConfigNotFoundExcepti .withInitialSetupComplete(true) .withDisplaySetupWizard(false) .withTombstone(false) - .withNotifications(List.of(expectedNotification)); + .withNotifications(List.of(expectedNotification)) + .withDefaultGeography(Geography.US); when(configRepository.getStandardWorkspace(workspace.getWorkspaceId(), false)) .thenReturn(workspace) @@ -343,7 +359,8 @@ void testUpdateWorkspace() throws JsonValidationException, ConfigNotFoundExcepti .news(false) .anonymousDataCollection(true) .securityUpdates(false) - .notifications(List.of(expectedNotificationRead)); + .notifications(List.of(expectedNotificationRead)) + .defaultGeography(GEOGRAPHY_US); verify(configRepository).writeStandardWorkspace(expectedWorkspace); @@ -369,7 +386,8 @@ void testUpdateWorkspaceNoNameUpdate() throws JsonValidationException, ConfigNot .withInitialSetupComplete(workspace.getInitialSetupComplete()) .withDisplaySetupWizard(workspace.getDisplaySetupWizard()) .withTombstone(false) - .withNotifications(workspace.getNotifications()); + .withNotifications(workspace.getNotifications()) + .withDefaultGeography(Geography.AUTO); when(configRepository.getStandardWorkspace(workspace.getWorkspaceId(), false)) .thenReturn(workspace) @@ -388,7 +406,8 @@ void testUpdateWorkspaceNoNameUpdate() throws JsonValidationException, ConfigNot .news(workspace.getNews()) .anonymousDataCollection(workspace.getAnonymousDataCollection()) .securityUpdates(workspace.getSecurityUpdates()) - .notifications(List.of(generateApiNotification())); + .notifications(List.of(generateApiNotification())) + .defaultGeography(GEOGRAPHY_AUTO); verify(configRepository).writeStandardWorkspace(expectedWorkspace); @@ -420,7 +439,8 @@ void testWorkspacePatchUpdate() throws JsonValidationException, ConfigNotFoundEx .news(workspace.getNews()) .anonymousDataCollection(true) .securityUpdates(workspace.getSecurityUpdates()) - .notifications(NotificationConverter.toApiList(workspace.getNotifications())); + .notifications(NotificationConverter.toApiList(workspace.getNotifications())) + .defaultGeography(GEOGRAPHY_AUTO); final WorkspaceRead actualWorkspaceRead = workspacesHandler.updateWorkspace(workspaceUpdate); verify(configRepository).writeStandardWorkspace(expectedWorkspace); diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index c6bac34a1e33..43e0b155e00a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -18,11 +18,13 @@ import io.airbyte.api.model.generated.ConnectionScheduleType; import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.DestinationRead; +import io.airbyte.api.model.generated.Geography; import io.airbyte.api.model.generated.JobStatus; import io.airbyte.api.model.generated.ResourceRequirements; import io.airbyte.api.model.generated.SourceRead; import io.airbyte.api.model.generated.SyncMode; import io.airbyte.api.model.generated.WebBackendConnectionListItem; +import io.airbyte.commons.enums.Enums; import io.airbyte.commons.text.Names; import io.airbyte.config.BasicSchedule; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; @@ -147,7 +149,8 @@ public static ConnectionRead generateExpectedConnectionRead(final UUID connectio final UUID sourceId, final UUID destinationId, final List operationIds, - final UUID sourceCatalogId) { + final UUID sourceCatalogId, + final Geography geography) { return new ConnectionRead() .connectionId(connectionId) @@ -168,7 +171,8 @@ public static ConnectionRead generateExpectedConnectionRead(final UUID connectio .cpuLimit(TESTING_RESOURCE_REQUIREMENTS.getCpuLimit()) .memoryRequest(TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest()) .memoryLimit(TESTING_RESOURCE_REQUIREMENTS.getMemoryLimit())) - .sourceCatalogId(sourceCatalogId); + .sourceCatalogId(sourceCatalogId) + .geography(geography); } public static ConnectionRead generateExpectedConnectionRead(final StandardSync standardSync) { @@ -177,7 +181,8 @@ public static ConnectionRead generateExpectedConnectionRead(final StandardSync s standardSync.getSourceId(), standardSync.getDestinationId(), standardSync.getOperationIds(), - standardSync.getSourceCatalogId()); + standardSync.getSourceCatalogId(), + Enums.convertTo(standardSync.getGeography(), Geography.class)); if (standardSync.getSchedule() == null) { connectionRead.schedule(null); @@ -200,7 +205,8 @@ public static ConnectionRead connectionReadFromStandardSync(final StandardSync s .name(standardSync.getName()) .namespaceFormat(standardSync.getNamespaceFormat()) .prefix(standardSync.getPrefix()) - .sourceCatalogId(standardSync.getSourceCatalogId()); + .sourceCatalogId(standardSync.getSourceCatalogId()) + .geography(ApiPojoConverters.toApiGeography(standardSync.getGeography())); if (standardSync.getNamespaceDefinition() != null) { connectionRead diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 3dd1afae62a9..6149700e1fe0 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -369,6 +369,7 @@

WebBackend

  • post /v1/web_backend/connections/get
  • post /v1/web_backend/workspace/state
  • post /v1/web_backend/connections/list
  • +
  • post /v1/web_backend/geographies/list
  • post /v1/web_backend/connections/update
  • Workspace

    @@ -8831,6 +8832,49 @@

    422

    InvalidInputExceptionInfo
    +
    +
    + Up +
    post /v1/web_backend/geographies/list
    +
    Returns available geographies can be selected to run data syncs in a particular geography. +The 'auto' entry indicates that the sync will be automatically assigned to a geography according +to the platform default behavior. Entries other than 'auto' are two-letter country codes that +follow the ISO 3166-1 alpha-2 standard. (webBackendListGeographies)
    +
    Returns all available geographies in which a data sync can run.
    + + + + + + + +

    Return type

    + + + + +

    Example data

    +
    Content-Type: application/json
    +
    {
    +  "geographies" : [ null, null ]
    +}
    + +

    Produces

    + This API call produces the following media types according to the Accept request header; + the media type will be conveyed by the Content-Type response header. +
      +
    • application/json
    • +
    + +

    Responses

    +

    200

    + Successful operation + WebBackendGeographiesListResult +
    +
    @@ -10073,6 +10120,7 @@

    ConnectionRead - status

    resourceRequirements (optional)
    sourceCatalogId (optional)
    UUID format: uuid
    +
    geography (optional)
    @@ -10191,6 +10239,7 @@

    ConnectionUpdate - status (optional)

    resourceRequirements (optional)
    sourceCatalogId (optional)
    UUID format: uuid
    +
    geography (optional)
    +
    +

    Geography - Up

    +
    +
    +
    +

    GlobalState - Up

    @@ -11185,6 +11240,7 @@

    WebBackendConnectionCreate
    resourceRequirements (optional)
    operations (optional)
    sourceCatalogId (optional)
    UUID format: uuid
    +
    geography (optional)

    @@ -11231,6 +11287,7 @@

    WebBackendConnectionRead - <
    resourceRequirements (optional)
    catalogId (optional)
    UUID format: uuid
    catalogDiff (optional)
    +
    geography (optional)

    @@ -11266,6 +11323,14 @@

    WebBackendConnectionUpdate
    skipReset (optional)
    operations (optional)
    sourceCatalogId (optional)
    UUID format: uuid
    +
    geography (optional)
    +

    + +
    notifications (optional)
    displaySetupWizard (optional)
    +
    defaultGeography (optional)
    firstCompletedSync (optional)
    feedbackDone (optional)
    +
    defaultGeography (optional)
    @@ -11359,6 +11426,7 @@

    WorkspaceUpdate - news (optional)

    securityUpdates (optional)
    notifications (optional)
    +
    defaultGeography (optional)