From 3b85a74723719f71db9285eaf4820e98e618d964 Mon Sep 17 00:00:00 2001 From: Michael Siega <109092231+mfsiega-airbyte@users.noreply.github.com> Date: Wed, 3 Aug 2022 18:37:21 +0200 Subject: [PATCH] Dual write old and new schedule schemas (#15039) * dual write old and new schedule schemas * validate that the old and new schedule types match --- .../config/helpers/ScheduleHelpers.java | 10 +++++++ .../DatabaseConfigPersistence.java | 18 ++++++++++++ .../server/converters/ApiPojoConverters.java | 5 ++++ .../server/handlers/ConnectionsHandler.java | 12 ++++++++ .../handlers/ConnectionsHandlerTest.java | 4 +++ .../server/helpers/ConnectionHelpers.java | 10 +++++++ .../workers/helper/ConnectionHelper.java | 28 +++++++++++++++++++ 7 files changed, 87 insertions(+) diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/ScheduleHelpers.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/ScheduleHelpers.java index 113738033741..fe984520041b 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/ScheduleHelpers.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/ScheduleHelpers.java @@ -6,6 +6,8 @@ import io.airbyte.config.BasicSchedule; import io.airbyte.config.Schedule; +import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSync.ScheduleType; import java.util.concurrent.TimeUnit; @SuppressWarnings("PMD.AvoidThrowingRawExceptionTypes") @@ -53,4 +55,12 @@ public static Long getIntervalInSecond(final BasicSchedule schedule) { return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits(); } + public static boolean isScheduleTypeMismatch(final StandardSync standardSync) { + if (standardSync.getScheduleType() == null) { + return false; + } + return (standardSync.getManual() && standardSync.getScheduleType() != ScheduleType.MANUAL) || (!standardSync.getManual() + && standardSync.getScheduleType() == ScheduleType.MANUAL); + } + } diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index 228f73bf195e..585591148491 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -47,6 +47,7 @@ import io.airbyte.config.StandardWorkspace; import io.airbyte.config.State; import io.airbyte.config.WorkspaceServiceAccount; +import io.airbyte.config.helpers.ScheduleHelpers; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; import io.airbyte.db.Database; import io.airbyte.db.ExceptionWrappingDatabase; @@ -617,6 +618,9 @@ private List> listStandardSyncWithMetadata(fina final List> standardSyncs = new ArrayList<>(); for (final Record record : result) { final StandardSync standardSync = DbConverter.buildStandardSync(record, connectionOperationIds(record.get(CONNECTION.ID))); + if (ScheduleHelpers.isScheduleTypeMismatch(standardSync)) { + throw new RuntimeException("unexpected schedule type mismatch"); + } standardSyncs.add(new ConfigWithMetadata<>( record.get(CONNECTION.ID).toString(), ConfigSchema.STANDARD_SYNC.name(), @@ -1080,6 +1084,10 @@ private void writeStandardSync(final List configs, final DSLContex .from(CONNECTION) .where(CONNECTION.ID.eq(standardSync.getConnectionId()))); + if (ScheduleHelpers.isScheduleTypeMismatch(standardSync)) { + throw new RuntimeException("unexpected schedule type mismatch"); + } + if (isExistingConfig) { ctx.update(CONNECTION) .set(CONNECTION.ID, standardSync.getConnectionId()) @@ -1096,6 +1104,11 @@ private void writeStandardSync(final List configs, final DSLContex io.airbyte.db.instance.configs.jooq.generated.enums.StatusType.class).orElseThrow()) .set(CONNECTION.SCHEDULE, JSONB.valueOf(Jsons.serialize(standardSync.getSchedule()))) .set(CONNECTION.MANUAL, standardSync.getManual()) + .set(CONNECTION.SCHEDULE_TYPE, + standardSync.getScheduleType() == null ? null + : Enums.toEnum(standardSync.getScheduleType().value(), io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class) + .orElseThrow()) + .set(CONNECTION.SCHEDULE_DATA, JSONB.valueOf(Jsons.serialize(standardSync.getScheduleData()))) .set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements()))) .set(CONNECTION.UPDATED_AT, timestamp) .set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId()) @@ -1130,6 +1143,11 @@ private void writeStandardSync(final List configs, final DSLContex io.airbyte.db.instance.configs.jooq.generated.enums.StatusType.class).orElseThrow()) .set(CONNECTION.SCHEDULE, JSONB.valueOf(Jsons.serialize(standardSync.getSchedule()))) .set(CONNECTION.MANUAL, standardSync.getManual()) + .set(CONNECTION.SCHEDULE_TYPE, + standardSync.getScheduleType() == null ? null + : Enums.toEnum(standardSync.getScheduleType().value(), io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class) + .orElseThrow()) + .set(CONNECTION.SCHEDULE_DATA, JSONB.valueOf(Jsons.serialize(standardSync.getScheduleData()))) .set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements()))) .set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId()) .set(CONNECTION.CREATED_AT, timestamp) diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java b/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java index 725da206ce68..0ce5af388239 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java @@ -13,6 +13,7 @@ import io.airbyte.api.model.generated.JobTypeResourceLimit; import io.airbyte.api.model.generated.ResourceRequirements; import io.airbyte.commons.enums.Enums; +import io.airbyte.config.BasicSchedule; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Schedule; import io.airbyte.config.StandardSync; @@ -164,4 +165,8 @@ public static Schedule.TimeUnit toPersistenceTimeUnit(final ConnectionSchedule.T return Enums.convertTo(apiTimeUnit, Schedule.TimeUnit.class); } + public static BasicSchedule.TimeUnit toBasicScheduleTimeUnit(final ConnectionSchedule.TimeUnitEnum apiTimeUnit) { + return Enums.convertTo(apiTimeUnit, BasicSchedule.TimeUnit.class); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index 54d01cdae495..691a3546a976 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -26,13 +26,16 @@ import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ActorCatalog; +import io.airbyte.config.BasicSchedule; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Schedule; +import io.airbyte.config.ScheduleData; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSync.ScheduleType; import io.airbyte.config.helpers.ScheduleHelpers; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; @@ -141,11 +144,20 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate) final Schedule schedule = new Schedule() .withTimeUnit(ApiPojoConverters.toPersistenceTimeUnit(connectionCreate.getSchedule().getTimeUnit())) .withUnits(connectionCreate.getSchedule().getUnits()); + // Populate the legacy field. + // TODO(https://github.com/airbytehq/airbyte/issues/11432): remove. standardSync .withManual(false) .withSchedule(schedule); + // Also write into the new field. This one will be consumed if populated. + standardSync + .withScheduleType(ScheduleType.BASIC_SCHEDULE); + standardSync.withScheduleData(new ScheduleData().withBasicSchedule( + new BasicSchedule().withTimeUnit(ApiPojoConverters.toBasicScheduleTimeUnit(connectionCreate.getSchedule().getTimeUnit())) + .withUnits(connectionCreate.getSchedule().getUnits()))); } else { standardSync.withManual(true); + standardSync.withScheduleType(ScheduleType.MANUAL); } configRepository.writeStandardSync(standardSync); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index af054de1209c..bcdadc52f8a6 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -44,6 +44,7 @@ import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSync.ScheduleType; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -116,6 +117,8 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio .withOperationIds(List.of(operationId)) .withManual(false) .withSchedule(ConnectionHelpers.generateBasicSchedule()) + .withScheduleType(ScheduleType.BASIC_SCHEDULE) + .withScheduleData(ConnectionHelpers.generateBasicScheduleData()) .withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS) .withSourceCatalogId(UUID.randomUUID()); standardSyncDeleted = new StandardSync() @@ -328,6 +331,7 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept .withStatus(StandardSync.Status.INACTIVE) .withCatalog(configuredCatalog) .withManual(true) + .withScheduleType(ScheduleType.MANUAL) .withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS) .withSourceCatalogId(newSourceCatalogId); diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index 9122b5debac3..e3c4ad38b78e 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -17,9 +17,11 @@ import io.airbyte.api.model.generated.ResourceRequirements; import io.airbyte.api.model.generated.SyncMode; import io.airbyte.commons.text.Names; +import io.airbyte.config.BasicSchedule; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Schedule; import io.airbyte.config.Schedule.TimeUnit; +import io.airbyte.config.ScheduleData; import io.airbyte.config.StandardSync; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -41,6 +43,8 @@ public class ConnectionHelpers { private static final String FIELD_NAME = "id"; private static final String BASIC_SCHEDULE_TIME_UNIT = "days"; private static final long BASIC_SCHEDULE_UNITS = 1L; + private static final String BASIC_SCHEDULE_DATA_TIME_UNITS = "days"; + private static final long BASIC_SCHEDULE_DATA_UNITS = 1L; public static final StreamDescriptor STREAM_DESCRIPTOR = new StreamDescriptor().withName(STREAM_NAME); @@ -100,6 +104,12 @@ public static Schedule generateBasicSchedule() { .withUnits(BASIC_SCHEDULE_UNITS); } + public static ScheduleData generateBasicScheduleData() { + return new ScheduleData().withBasicSchedule(new BasicSchedule() + .withTimeUnit(BasicSchedule.TimeUnit.fromValue((BASIC_SCHEDULE_DATA_TIME_UNITS))) + .withUnits(BASIC_SCHEDULE_DATA_UNITS)); + } + public static ConnectionRead generateExpectedConnectionRead(final UUID connectionId, final UUID sourceId, final UUID destinationId, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java b/airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java index 9450e8cae1d7..ad3eecea35eb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java @@ -7,9 +7,12 @@ import com.google.common.base.Preconditions; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.BasicSchedule; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Schedule; +import io.airbyte.config.ScheduleData; import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSync.ScheduleType; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.persistence.WorkspaceHelper; @@ -89,8 +92,15 @@ public static StandardSync updateConnectionObject(final WorkspaceHelper workspac .withTimeUnit(update.getSchedule().getTimeUnit()) .withUnits(update.getSchedule().getUnits()); newConnection.withManual(false).withSchedule(newSchedule); + // Also write into the new field. This one will be consumed if populated. + newConnection + .withScheduleType(ScheduleType.BASIC_SCHEDULE); + newConnection.withScheduleData(new ScheduleData().withBasicSchedule( + new BasicSchedule().withTimeUnit(convertTimeUnitSchema(update.getSchedule().getTimeUnit())) + .withUnits(update.getSchedule().getUnits()))); } else { newConnection.withManual(true).withSchedule(null); + newConnection.withScheduleType(ScheduleType.MANUAL).withScheduleData(null); } return newConnection; @@ -124,4 +134,22 @@ public static void validateWorkspace(final WorkspaceHelper workspaceHelper, } } + // Helper method to convert between TimeUnit enums for old and new schedule schemas. + private static BasicSchedule.TimeUnit convertTimeUnitSchema(Schedule.TimeUnit timeUnit) { + switch (timeUnit) { + case MINUTES: + return BasicSchedule.TimeUnit.MINUTES; + case HOURS: + return BasicSchedule.TimeUnit.HOURS; + case DAYS: + return BasicSchedule.TimeUnit.DAYS; + case WEEKS: + return BasicSchedule.TimeUnit.WEEKS; + case MONTHS: + return BasicSchedule.TimeUnit.MONTHS; + default: + throw new RuntimeException("Unhandled TimeUnitEnum: " + timeUnit); + } + } + }