Skip to content

Commit

Permalink
move file migration logic in ServerApp to its own method and mark as … (
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Oct 30, 2021
1 parent 58902f3 commit 8d95cb8
Showing 1 changed file with 38 additions and 16 deletions.
54 changes: 38 additions & 16 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,22 +218,15 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
final SpecCachingSynchronousSchedulerClient cachingSchedulerClient = new SpecCachingSynchronousSchedulerClient(bucketSpecCacheSchedulerClient);
final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient);

// required before migration
// TODO: remove this specFetcherFn logic once file migrations are deprecated
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.getSpec(dockerImage)));

Optional<AirbyteVersion> airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES;
final boolean versionSupportsAutoMigrate = airbyteDatabaseVersion.get().greaterThanOrEqualTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION);
if (!isKubernetes || versionSupportsAutoMigrate) {
runAutomaticMigration(configRepository, jobPersistence, seed, specFetcher, airbyteVersion, airbyteDatabaseVersion.get());
// After migration, upgrade the DB version
airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
} else {
LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.serialize());
}
}
// todo (cgardens) - this method is deprecated. new migrations are not run using this code path. it
// is scheduled to be removed.
final Optional<AirbyteVersion> airbyteDatabaseVersion = runFileMigration(
airbyteVersion,
configRepository,
seed,
specFetcher,
jobPersistence,
configs);

if (airbyteDatabaseVersion.isPresent() && AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion.get())) {
LOGGER.info("Starting server...");
Expand All @@ -258,6 +251,35 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
}
}

@Deprecated
@SuppressWarnings({"DeprecatedIsStillUsed"})
private static Optional<AirbyteVersion> runFileMigration(final AirbyteVersion airbyteVersion,
final ConfigRepository configRepository,
final ConfigPersistence seed,
final SpecFetcher specFetcher,
final JobPersistence jobPersistence,
final Configs configs)
throws IOException {
// required before migration
// TODO: remove this specFetcherFn logic once file migrations are deprecated
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.getSpec(dockerImage)));

Optional<AirbyteVersion> airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES;
final boolean versionSupportsAutoMigrate = airbyteDatabaseVersion.get().greaterThanOrEqualTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION);
if (!isKubernetes || versionSupportsAutoMigrate) {
runAutomaticMigration(configRepository, jobPersistence, seed, specFetcher, airbyteVersion, airbyteDatabaseVersion.get());
// After migration, upgrade the DB version
airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
} else {
LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.serialize());
}
}

return airbyteDatabaseVersion;
}

public static void main(final String[] args) throws Exception {
getServer(new ServerFactory.Api(), YamlSeedConfigPersistence.getDefault()).start();
}
Expand Down

0 comments on commit 8d95cb8

Please sign in to comment.