Skip to content

Commit

Permalink
Dual write old and new schedule schemas (#15039)
Browse files Browse the repository at this point in the history
* dual write old and new schedule schemas

* validate that the old and new schedule types match
  • Loading branch information
mfsiega-airbyte authored Aug 3, 2022
1 parent e7b9e00 commit 3b85a74
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("PMD.AvoidThrowingRawExceptionTypes")
Expand Down Expand Up @@ -53,4 +55,12 @@ public static Long getIntervalInSecond(final BasicSchedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

public static boolean isScheduleTypeMismatch(final StandardSync standardSync) {
if (standardSync.getScheduleType() == null) {
return false;
}
return (standardSync.getManual() && standardSync.getScheduleType() != ScheduleType.MANUAL) || (!standardSync.getManual()
&& standardSync.getScheduleType() == ScheduleType.MANUAL);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
Expand Down Expand Up @@ -617,6 +618,9 @@ private List<ConfigWithMetadata<StandardSync>> listStandardSyncWithMetadata(fina
final List<ConfigWithMetadata<StandardSync>> standardSyncs = new ArrayList<>();
for (final Record record : result) {
final StandardSync standardSync = DbConverter.buildStandardSync(record, connectionOperationIds(record.get(CONNECTION.ID)));
if (ScheduleHelpers.isScheduleTypeMismatch(standardSync)) {
throw new RuntimeException("unexpected schedule type mismatch");
}
standardSyncs.add(new ConfigWithMetadata<>(
record.get(CONNECTION.ID).toString(),
ConfigSchema.STANDARD_SYNC.name(),
Expand Down Expand Up @@ -1080,6 +1084,10 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
.from(CONNECTION)
.where(CONNECTION.ID.eq(standardSync.getConnectionId())));

if (ScheduleHelpers.isScheduleTypeMismatch(standardSync)) {
throw new RuntimeException("unexpected schedule type mismatch");
}

if (isExistingConfig) {
ctx.update(CONNECTION)
.set(CONNECTION.ID, standardSync.getConnectionId())
Expand All @@ -1096,6 +1104,11 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
io.airbyte.db.instance.configs.jooq.generated.enums.StatusType.class).orElseThrow())
.set(CONNECTION.SCHEDULE, JSONB.valueOf(Jsons.serialize(standardSync.getSchedule())))
.set(CONNECTION.MANUAL, standardSync.getManual())
.set(CONNECTION.SCHEDULE_TYPE,
standardSync.getScheduleType() == null ? null
: Enums.toEnum(standardSync.getScheduleType().value(), io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class)
.orElseThrow())
.set(CONNECTION.SCHEDULE_DATA, JSONB.valueOf(Jsons.serialize(standardSync.getScheduleData())))
.set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements())))
.set(CONNECTION.UPDATED_AT, timestamp)
.set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId())
Expand Down Expand Up @@ -1130,6 +1143,11 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
io.airbyte.db.instance.configs.jooq.generated.enums.StatusType.class).orElseThrow())
.set(CONNECTION.SCHEDULE, JSONB.valueOf(Jsons.serialize(standardSync.getSchedule())))
.set(CONNECTION.MANUAL, standardSync.getManual())
.set(CONNECTION.SCHEDULE_TYPE,
standardSync.getScheduleType() == null ? null
: Enums.toEnum(standardSync.getScheduleType().value(), io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class)
.orElseThrow())
.set(CONNECTION.SCHEDULE_DATA, JSONB.valueOf(Jsons.serialize(standardSync.getScheduleData())))
.set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements())))
.set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId())
.set(CONNECTION.CREATED_AT, timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.api.model.generated.JobTypeResourceLimit;
import io.airbyte.api.model.generated.ResourceRequirements;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
Expand Down Expand Up @@ -164,4 +165,8 @@ public static Schedule.TimeUnit toPersistenceTimeUnit(final ConnectionSchedule.T
return Enums.convertTo(apiTimeUnit, Schedule.TimeUnit.class);
}

public static BasicSchedule.TimeUnit toBasicScheduleTimeUnit(final ConnectionSchedule.TimeUnitEnum apiTimeUnit) {
return Enums.convertTo(apiTimeUnit, BasicSchedule.TimeUnit.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
import io.airbyte.config.ScheduleData;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
Expand Down Expand Up @@ -141,11 +144,20 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
final Schedule schedule = new Schedule()
.withTimeUnit(ApiPojoConverters.toPersistenceTimeUnit(connectionCreate.getSchedule().getTimeUnit()))
.withUnits(connectionCreate.getSchedule().getUnits());
// Populate the legacy field.
// TODO(https://github.com/airbytehq/airbyte/issues/11432): remove.
standardSync
.withManual(false)
.withSchedule(schedule);
// Also write into the new field. This one will be consumed if populated.
standardSync
.withScheduleType(ScheduleType.BASIC_SCHEDULE);
standardSync.withScheduleData(new ScheduleData().withBasicSchedule(
new BasicSchedule().withTimeUnit(ApiPojoConverters.toBasicScheduleTimeUnit(connectionCreate.getSchedule().getTimeUnit()))
.withUnits(connectionCreate.getSchedule().getUnits())));
} else {
standardSync.withManual(true);
standardSync.withScheduleType(ScheduleType.MANUAL);
}

configRepository.writeStandardSync(standardSync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -116,6 +117,8 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
.withOperationIds(List.of(operationId))
.withManual(false)
.withSchedule(ConnectionHelpers.generateBasicSchedule())
.withScheduleType(ScheduleType.BASIC_SCHEDULE)
.withScheduleData(ConnectionHelpers.generateBasicScheduleData())
.withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS)
.withSourceCatalogId(UUID.randomUUID());
standardSyncDeleted = new StandardSync()
Expand Down Expand Up @@ -328,6 +331,7 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
.withStatus(StandardSync.Status.INACTIVE)
.withCatalog(configuredCatalog)
.withManual(true)
.withScheduleType(ScheduleType.MANUAL)
.withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS)
.withSourceCatalogId(newSourceCatalogId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.airbyte.api.model.generated.ResourceRequirements;
import io.airbyte.api.model.generated.SyncMode;
import io.airbyte.commons.text.Names;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
import io.airbyte.config.Schedule.TimeUnit;
import io.airbyte.config.ScheduleData;
import io.airbyte.config.StandardSync;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand All @@ -41,6 +43,8 @@ public class ConnectionHelpers {
private static final String FIELD_NAME = "id";
private static final String BASIC_SCHEDULE_TIME_UNIT = "days";
private static final long BASIC_SCHEDULE_UNITS = 1L;
private static final String BASIC_SCHEDULE_DATA_TIME_UNITS = "days";
private static final long BASIC_SCHEDULE_DATA_UNITS = 1L;

public static final StreamDescriptor STREAM_DESCRIPTOR = new StreamDescriptor().withName(STREAM_NAME);

Expand Down Expand Up @@ -100,6 +104,12 @@ public static Schedule generateBasicSchedule() {
.withUnits(BASIC_SCHEDULE_UNITS);
}

public static ScheduleData generateBasicScheduleData() {
return new ScheduleData().withBasicSchedule(new BasicSchedule()
.withTimeUnit(BasicSchedule.TimeUnit.fromValue((BASIC_SCHEDULE_DATA_TIME_UNITS)))
.withUnits(BASIC_SCHEDULE_DATA_UNITS));
}

public static ConnectionRead generateExpectedConnectionRead(final UUID connectionId,
final UUID sourceId,
final UUID destinationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
import io.airbyte.config.ScheduleData;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
Expand Down Expand Up @@ -89,8 +92,15 @@ public static StandardSync updateConnectionObject(final WorkspaceHelper workspac
.withTimeUnit(update.getSchedule().getTimeUnit())
.withUnits(update.getSchedule().getUnits());
newConnection.withManual(false).withSchedule(newSchedule);
// Also write into the new field. This one will be consumed if populated.
newConnection
.withScheduleType(ScheduleType.BASIC_SCHEDULE);
newConnection.withScheduleData(new ScheduleData().withBasicSchedule(
new BasicSchedule().withTimeUnit(convertTimeUnitSchema(update.getSchedule().getTimeUnit()))
.withUnits(update.getSchedule().getUnits())));
} else {
newConnection.withManual(true).withSchedule(null);
newConnection.withScheduleType(ScheduleType.MANUAL).withScheduleData(null);
}

return newConnection;
Expand Down Expand Up @@ -124,4 +134,22 @@ public static void validateWorkspace(final WorkspaceHelper workspaceHelper,
}
}

// Helper method to convert between TimeUnit enums for old and new schedule schemas.
private static BasicSchedule.TimeUnit convertTimeUnitSchema(Schedule.TimeUnit timeUnit) {
switch (timeUnit) {
case MINUTES:
return BasicSchedule.TimeUnit.MINUTES;
case HOURS:
return BasicSchedule.TimeUnit.HOURS;
case DAYS:
return BasicSchedule.TimeUnit.DAYS;
case WEEKS:
return BasicSchedule.TimeUnit.WEEKS;
case MONTHS:
return BasicSchedule.TimeUnit.MONTHS;
default:
throw new RuntimeException("Unhandled TimeUnitEnum: " + timeUnit);
}
}

}

0 comments on commit 3b85a74

Please sign in to comment.