diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java index 2b8767143b98..4a3036bfb742 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.lang.Exceptions; import io.airbyte.config.ConfigSchema; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; @@ -75,7 +76,7 @@ public static ConfigPersistence createWithValidation(final Path storageRoot) thr public FileSystemConfigPersistence(final Path storageRoot) { this.storageRoot = storageRoot; - this.configRoot = storageRoot.resolve(CONFIG_DIR); + this.configRoot = Exceptions.toRuntime(() -> Files.createDirectories(storageRoot.resolve(CONFIG_DIR))); } @Override diff --git a/airbyte-server/build.gradle b/airbyte-server/build.gradle index 3a08d7723a52..9ea1d34ca381 100644 --- a/airbyte-server/build.gradle +++ b/airbyte-server/build.gradle @@ -72,7 +72,8 @@ task copySeed(type: Copy, dependsOn: [project(':airbyte-config:init').processRes } // need to make sure that the files are in the resource directory before copying. -//project.tasks.copySeed.mustRunAfter(project(':airbyte-config:init').tasks.processResources) +// tests require the seed to exist. +test.dependsOn(project.tasks.copySeed) assemble.dependsOn(project.tasks.copySeed) mainClassName = 'io.airbyte.server.ServerApp' diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExport.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java similarity index 96% rename from airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExport.java rename to airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java index 316e0d3f53dc..54a975e60821 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExport.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java @@ -57,7 +57,7 @@ * would fail. 2. Unlike ArchiveHandler, this doesn't take the dump of specific files but looks at * the config directory and takes the full dump of whatever is available */ -public class ConfigDumpExport { +public class ConfigDumpExporter { private static final String ARCHIVE_FILE_NAME = "airbyte_config_dump"; private static final String CONFIG_FOLDER_NAME = "airbyte_config"; @@ -65,12 +65,10 @@ public class ConfigDumpExport { private static final String VERSION_FILE_NAME = "VERSION"; private final ConfigRepository configRepository; private final JobPersistence jobPersistence; - private final String version; - public ConfigDumpExport(ConfigRepository configRepository, JobPersistence jobPersistence, String version) { + public ConfigDumpExporter(ConfigRepository configRepository, JobPersistence jobPersistence) { this.configRepository = configRepository; this.jobPersistence = jobPersistence; - this.version = version; } public File dump() { @@ -89,6 +87,7 @@ public File dump() { } private void exportVersionFile(Path tempFolder) throws IOException { + final String version = jobPersistence.getVersion().orElseThrow(); final File versionFile = Files.createFile(tempFolder.resolve(VERSION_FILE_NAME)).toFile(); FileUtils.writeStringToFile(versionFile, version, Charset.defaultCharset()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImport.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java similarity index 83% rename from airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImport.java rename to airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java index a57ac80b61e0..39e16bfb32df 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImport.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java @@ -26,11 +26,13 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; import io.airbyte.analytics.TrackingClientSingleton; import io.airbyte.api.model.ImportRead; import io.airbyte.api.model.ImportRead.StatusEnum; +import io.airbyte.commons.enums.Enums; import io.airbyte.commons.io.Archives; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; @@ -71,28 +73,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConfigDumpImport { +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") +public class ConfigDumpImporter { - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDumpImport.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDumpImporter.class); private static final String CONFIG_FOLDER_NAME = "airbyte_config"; private static final String DB_FOLDER_NAME = "airbyte_db"; private static final String VERSION_FILE_NAME = "VERSION"; private final ConfigRepository configRepository; private final JsonSchemaValidator jsonSchemaValidator; private final JobPersistence postgresPersistence; - private final String targetVersion; - private final String initialVersion; - private final Path latestSeed; - - public ConfigDumpImport(String initialVersion, - String targetVersion, - Path latestSeed, - JobPersistence postgresPersistence, - ConfigRepository configRepository) { - this.targetVersion = targetVersion; - this.initialVersion = initialVersion; - this.latestSeed = latestSeed; - this.jsonSchemaValidator = new JsonSchemaValidator(); + + public ConfigDumpImporter(ConfigRepository configRepository, JobPersistence postgresPersistence) { + this(configRepository, postgresPersistence, new JsonSchemaValidator()); + } + + @VisibleForTesting + public ConfigDumpImporter(ConfigRepository configRepository, JobPersistence postgresPersistence, JsonSchemaValidator jsonSchemaValidator) { + this.jsonSchemaValidator = jsonSchemaValidator; this.postgresPersistence = postgresPersistence; this.configRepository = configRepository; } @@ -109,7 +107,17 @@ public Optional getCurrentCustomerId() { } } - public ImportRead importData(File archive) { + public ImportRead importData(String targetVersion, File archive) { + return importDataInternal(targetVersion, archive, Optional.empty()); + } + + public ImportRead importDataWithSeed(String targetVersion, File archive, Path seedPath) { + return importDataInternal(targetVersion, archive, Optional.of(seedPath)); + } + + // seedPath - if present, merge with the import. otherwise just use the data in the import. + private ImportRead importDataInternal(String targetVersion, File archive, Optional seedPath) { + Preconditions.checkNotNull(seedPath); final Optional previousCustomerIdOptional = getCurrentCustomerId(); ImportRead result; @@ -121,10 +129,9 @@ public ImportRead importData(File archive) { // 2. dry run try { - checkImport(sourceRoot); + checkImport(targetVersion, sourceRoot, seedPath); } catch (Exception e) { - LOGGER.warn("Dry run failed, setting DB version back to initial version"); - postgresPersistence.setVersion(initialVersion); + LOGGER.error("Dry run failed.", e); throw e; } @@ -132,7 +139,7 @@ public ImportRead importData(File archive) { importDatabaseFromArchive(sourceRoot, targetVersion); // 4. Import Configs - importConfigsFromArchive(sourceRoot, false); + importConfigsFromArchive(sourceRoot, seedPath, false); // 5. Set DB version LOGGER.info("Setting the DB Airbyte version to : " + targetVersion); @@ -158,7 +165,7 @@ public ImportRead importData(File archive) { return result; } - private void checkImport(Path tempFolder) throws IOException, JsonValidationException { + private void checkImport(String targetVersion, Path tempFolder, Optional seed) throws IOException, JsonValidationException { final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME); final String importVersion = Files.readString(versionFile, Charset.defaultCharset()) .replace("\n", "").strip(); @@ -169,7 +176,7 @@ private void checkImport(Path tempFolder) throws IOException, JsonValidationExce "Please upgrade your Airbyte Archive, see more at https://docs.airbyte.io/tutorials/upgrading-airbyte\n", importVersion, targetVersion)); } - importConfigsFromArchive(tempFolder, true); + importConfigsFromArchive(tempFolder, seed, true); } // Config @@ -180,39 +187,58 @@ private List listDirectories(Path sourceRoot) throws IOException { } } - private void importConfigsFromArchive(final Path sourceRoot, final boolean dryRun) throws IOException, JsonValidationException { - List sourceDefinitionsToMigrate = new ArrayList<>(); - List destinationDefinitionsToMigrate = new ArrayList<>(); + private void importConfigsFromArchive(final Path sourceRoot, Optional seedPath, final boolean dryRun) + throws IOException, JsonValidationException { + final List sourceDefinitionsToMigrate = new ArrayList<>(); + final List destinationDefinitionsToMigrate = new ArrayList<>(); final boolean[] sourceProcessed = {false}; final boolean[] destinationProcessed = {false}; - List directories = listDirectories(sourceRoot); + final List directories = listDirectories(sourceRoot); // We sort the directories cause we want to process SOURCE_CONNECTION before // STANDARD_SOURCE_DEFINITION and DESTINATION_CONNECTION before STANDARD_DESTINATION_DEFINITION // so that we can identify which definitions should not be upgraded to the latest version Collections.sort(directories); - Map> data = new LinkedHashMap<>(); - Map> latestSeeds = latestSeeds(); + final Map> data = new LinkedHashMap<>(); + + final Map> seed; + if (seedPath.isPresent()) { + seed = getSeed(seedPath.get()); + } else { + seed = new HashMap<>(); + } for (String directory : directories) { - ConfigSchema configSchema = ConfigSchema.valueOf(directory.replace(".yaml", "")); + final Optional configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class); + + if (configSchemaOptional.isEmpty()) { + continue; + } + + final ConfigSchema configSchema = configSchemaOptional.get(); Stream configs = readConfigsFromArchive(sourceRoot, configSchema); - configs = streamWithAdditionalOperation(sourceDefinitionsToMigrate, destinationDefinitionsToMigrate, sourceProcessed, destinationProcessed, - configSchema, configs, latestSeeds); + configs = streamWithAdditionalOperation( + sourceDefinitionsToMigrate, + destinationDefinitionsToMigrate, + sourceProcessed, + destinationProcessed, + configSchema, + configs, + seed); data.put(configSchema, configs); } configRepository.replaceAllConfigs(data, dryRun); } - private Map> latestSeeds() throws IOException { - List configSchemas = Files.list(latestSeed).map(c -> ConfigSchema.valueOf(c.getFileName().toString())).collect(Collectors.toList()); - Map> allData = new HashMap<>(); + private Map> getSeed(Path seed) throws IOException { + final List configSchemas = Files.list(seed).map(c -> ConfigSchema.valueOf(c.getFileName().toString())).collect(Collectors.toList()); + final Map> allData = new HashMap<>(); for (ConfigSchema configSchema : configSchemas) { - Map data = readLatestSeed(configSchema); + final Map data = readLatestSeed(seed, configSchema); allData.put(configSchema, data); } return allData; } - private Map readLatestSeed(ConfigSchema configSchema) throws IOException { + private Map readLatestSeed(Path latestSeed, ConfigSchema configSchema) throws IOException { try (Stream files = Files.list(latestSeed.resolve(configSchema.toString()))) { final List ids = files .filter(p -> !p.endsWith(".json")) @@ -274,7 +300,7 @@ private Stream getDefinitionStream(List definitionsToMigrate, } return Streams.concat(configs.filter(c -> definitionsToMigrate.contains(configSchema.getId(c))), - latestSeeds.get(configSchema).entrySet().stream().filter(c -> !definitionsToMigrate.contains(c.getKey())) + latestSeeds.getOrDefault(configSchema, new HashMap<>()).entrySet().stream().filter(c -> !definitionsToMigrate.contains(c.getKey())) .map(Entry::getValue)); } @@ -314,8 +340,7 @@ protected static Path buildConfigPath(final Path storageRoot, final ConfigSchema } // Postgres Portion - public void importDatabaseFromArchive(final Path storageRoot, final String airbyteVersion) - throws IOException { + public void importDatabaseFromArchive(final Path storageRoot, final String airbyteVersion) throws IOException { try { final Map> data = new HashMap<>(); for (DatabaseSchema tableType : DatabaseSchema.values()) { @@ -331,8 +356,7 @@ public void importDatabaseFromArchive(final Path storageRoot, final String airby postgresPersistence.importDatabase(airbyteVersion, data); LOGGER.info("Successful upgrade of airbyte postgres database from archive"); } catch (Exception e) { - LOGGER.warn("Postgres database version upgrade failed, setting DB version back to initial version"); - postgresPersistence.setVersion(initialVersion); + LOGGER.warn("Postgres database version upgrade failed, reverting to state previous to migration."); throw e; } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/RunMigration.java b/airbyte-server/src/main/java/io/airbyte/server/RunMigration.java index 7a8a8d4b4952..6058bb818612 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/RunMigration.java +++ b/airbyte-server/src/main/java/io/airbyte/server/RunMigration.java @@ -44,41 +44,40 @@ public class RunMigration implements Runnable, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(RunMigration.class); private final String targetVersion; - private final ConfigDumpExport configDumpExport; - private final ConfigDumpImport configDumpImport; + private final Path seedPath; + private final ConfigDumpExporter configDumpExporter; + private final ConfigDumpImporter configDumpImporter; private final List filesToBeCleanedUp = new ArrayList<>(); - public RunMigration(String initialVersion, - JobPersistence jobPersistence, + public RunMigration(JobPersistence jobPersistence, ConfigRepository configRepository, String targetVersion, - Path latestSeeds) { + Path seedPath) { this.targetVersion = targetVersion; - this.configDumpExport = new ConfigDumpExport(configRepository, jobPersistence, initialVersion); - this.configDumpImport = new ConfigDumpImport(initialVersion, targetVersion, latestSeeds, jobPersistence, configRepository); + this.seedPath = seedPath; + this.configDumpExporter = new ConfigDumpExporter(configRepository, jobPersistence); + this.configDumpImporter = new ConfigDumpImporter(configRepository, jobPersistence); } @Override public void run() { try { // Export data - File exportData = configDumpExport.dump(); + File exportData = configDumpExporter.dump(); filesToBeCleanedUp.add(exportData); // Define output target final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive_output"); - final File output = Files.createTempFile(tempFolder, "airbyte_archive_output", ".tar.gz") - .toFile(); + final File output = Files.createTempFile(tempFolder, "airbyte_archive_output", ".tar.gz").toFile(); filesToBeCleanedUp.add(output); filesToBeCleanedUp.add(tempFolder.toFile()); // Run Migration - MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(), - targetVersion); + MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(), targetVersion); MigrationRunner.run(migrateConfig); // Import data - ImportRead importRead = configDumpImport.importData(output); + ImportRead importRead = configDumpImporter.importDataWithSeed(targetVersion, output, seedPath); if (importRead.getStatus() == StatusEnum.FAILED) { throw new RuntimeException("Automatic migration failed : " + importRead.getReason()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 690185763a5a..7e4a49fb3441 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -262,8 +262,11 @@ private static void runAutomaticMigration(ConfigRepository configRepository, LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion); final Path latestSeedsPath = Path.of(System.getProperty("user.dir")).resolve("latest_seeds"); LOGGER.info("Last seeds dir: {}", latestSeedsPath); - try (RunMigration runMigration = new RunMigration(airbyteDatabaseVersion, - jobPersistence, configRepository, airbyteVersion, latestSeedsPath)) { + try (final RunMigration runMigration = new RunMigration( + jobPersistence, + configRepository, + airbyteVersion, + latestSeedsPath)) { runMigration.run(); } catch (Exception e) { LOGGER.error("Automatic Migration failed ", e); diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigFileArchiver.java b/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigFileArchiver.java deleted file mode 100644 index 4966e48ba3b2..000000000000 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigFileArchiver.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.server.converters; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.lang.Exceptions; -import io.airbyte.commons.yaml.Yamls; -import io.airbyte.config.ConfigSchema; -import io.airbyte.config.DestinationConnection; -import io.airbyte.config.SourceConnection; -import io.airbyte.config.StandardDestinationDefinition; -import io.airbyte.config.StandardSourceDefinition; -import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSyncOperation; -import io.airbyte.config.StandardWorkspace; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.validation.json.JsonSchemaValidator; -import io.airbyte.validation.json.JsonValidationException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConfigFileArchiver { - - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigFileArchiver.class); - private static final String CONFIG_FOLDER_NAME = "airbyte_config"; - - private final ConfigRepository configRepository; - private final JsonSchemaValidator jsonSchemaValidator; - - public ConfigFileArchiver(final ConfigRepository configRepository, final JsonSchemaValidator jsonSchemaValidator) { - this.configRepository = configRepository; - this.jsonSchemaValidator = jsonSchemaValidator; - } - - public ConfigFileArchiver(final ConfigRepository configRepository) { - this(configRepository, new JsonSchemaValidator()); - } - - public void exportConfigsToArchive(final Path storageRoot) throws ConfigNotFoundException, IOException, JsonValidationException { - writeConfigsToArchive(storageRoot, ConfigSchema.STANDARD_WORKSPACE, configRepository.listStandardWorkspaces(true)); - writeConfigsToArchive(storageRoot, ConfigSchema.STANDARD_SOURCE_DEFINITION, configRepository.listStandardSources()); - writeConfigsToArchive(storageRoot, ConfigSchema.STANDARD_DESTINATION_DEFINITION, configRepository.listStandardDestinationDefinitions()); - writeConfigsToArchive(storageRoot, ConfigSchema.SOURCE_CONNECTION, configRepository.listSourceConnection()); - writeConfigsToArchive(storageRoot, ConfigSchema.DESTINATION_CONNECTION, configRepository.listDestinationConnection()); - final List standardSyncs = configRepository.listStandardSyncs(); - writeConfigsToArchive(storageRoot, ConfigSchema.STANDARD_SYNC, standardSyncs); - final List standardSyncOperations = configRepository.listStandardSyncOperations(); - writeConfigsToArchive(storageRoot, ConfigSchema.STANDARD_SYNC_OPERATION, standardSyncOperations); - } - - /** - * Takes configuration objects from @param configList with schema @param schemaType and serializes - * them into a single archive file stored in YAML. Objects will be ordered by their String - * representation in the archive. - */ - private void writeConfigsToArchive(final Path storageRoot, final ConfigSchema schemaType, final List configList) throws IOException { - final Path configPath = buildConfigPath(storageRoot, schemaType); - Files.createDirectories(configPath.getParent()); - if (!configList.isEmpty()) { - final List sortedConfigs = configList.stream().sorted(Comparator.comparing(T::toString)).collect(Collectors.toList()); - Files.writeString(configPath, Yamls.serialize(sortedConfigs)); - LOGGER.debug(String.format("Successful export of airbyte config %s", schemaType)); - } else { - // Create empty file - Files.createFile(configPath); - } - } - - public void importConfigsFromArchive(final Path storageRoot, final boolean dryRun) - throws IOException, JsonValidationException { - Exceptions.toRuntime(() -> { - if (dryRun) { - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_WORKSPACE, StandardWorkspace.class); - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class); - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class); - readConfigsFromArchive(storageRoot, ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); - readConfigsFromArchive(storageRoot, ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_SYNC, StandardSync.class); - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_SYNC_OPERATION, StandardSyncOperation.class); - } else { - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_WORKSPACE, StandardWorkspace.class) - .forEach(config -> Exceptions.toRuntime(() -> configRepository.writeStandardWorkspace(config))); - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class) - .forEach(config -> Exceptions.toRuntime(() -> configRepository.writeStandardSource(config))); - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class) - .forEach(config -> Exceptions.toRuntime(() -> configRepository.writeStandardDestinationDefinition(config))); - readConfigsFromArchive(storageRoot, ConfigSchema.SOURCE_CONNECTION, SourceConnection.class) - .forEach(config -> Exceptions.toRuntime(() -> configRepository.writeSourceConnection(config))); - readConfigsFromArchive(storageRoot, ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class) - .forEach(config -> Exceptions.toRuntime(() -> configRepository.writeDestinationConnection(config))); - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_SYNC, StandardSync.class) - .forEach(config -> Exceptions.toRuntime(() -> configRepository.writeStandardSync(config))); - readConfigsFromArchive(storageRoot, ConfigSchema.STANDARD_SYNC_OPERATION, StandardSyncOperation.class) - .forEach(config -> Exceptions.toRuntime(() -> configRepository.writeStandardSyncOperation(config)));; - LOGGER.debug("Successful import of airbyte configs"); - } - }); - } - - /** - * Reads a YAML configuration archive file and deserializes them into a list of configuration - * objects. The objects will be validated against the current version of Airbyte server's JSON - * Schema @param schemaType. - */ - private List readConfigsFromArchive(final Path storageRoot, final ConfigSchema schemaType, final Class clazz) - throws IOException, JsonValidationException { - final List results = new ArrayList<>(); - final Path configPath = buildConfigPath(storageRoot, schemaType); - if (configPath.toFile().exists()) { - final String configStr = Files.readString(configPath); - final JsonNode node = Yamls.deserialize(configStr); - final Iterator it = node.elements(); - while (it.hasNext()) { - final JsonNode element = it.next(); - final T config = Jsons.object(element, clazz); - validateJson(config, schemaType); - results.add(config); - } - LOGGER.debug(String.format("Successful read of airbyte config %s from archive", schemaType)); - } else { - throw new FileNotFoundException(String.format("Airbyte Configuration %s was not found in the archive", schemaType)); - } - return results; - } - - private void validateJson(final T config, final ConfigSchema configType) throws JsonValidationException { - JsonNode schema = JsonSchemaValidator.getSchema(configType.getFile()); - jsonSchemaValidator.ensure(schema, Jsons.jsonNode(config)); - } - - protected static Path buildConfigPath(final Path storageRoot, final ConfigSchema schemaType) { - return storageRoot.resolve(CONFIG_FOLDER_NAME) - .resolve(String.format("%s.yaml", schemaType.name())); - } - -} diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java index 9d1bf4941622..7bc55c33b26b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java @@ -24,22 +24,18 @@ package io.airbyte.server.handlers; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; import io.airbyte.api.model.ImportRead; import io.airbyte.api.model.ImportRead.StatusEnum; -import io.airbyte.commons.io.Archives; import io.airbyte.commons.io.FileTtlManager; -import io.airbyte.commons.version.AirbyteVersion; -import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.PersistenceConstants; import io.airbyte.scheduler.persistence.JobPersistence; -import io.airbyte.server.converters.ConfigFileArchiver; -import io.airbyte.server.converters.DatabaseArchiver; -import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.server.ConfigDumpExporter; +import io.airbyte.server.ConfigDumpImporter; import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; @@ -51,24 +47,40 @@ public class ArchiveHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ArchiveHandler.class); - private static final String ARCHIVE_FILE_NAME = "airbyte_archive"; - private static final String VERSION_FILE_NAME = "VERSION"; private final String version; private final ConfigRepository configRepository; - private final ConfigFileArchiver configFileArchiver; - private final DatabaseArchiver databaseArchiver; + private final ConfigDumpExporter configDumpExporter; + private final ConfigDumpImporter configDumpImporter; private final FileTtlManager fileTtlManager; + private final TrackingClient trackingClient; public ArchiveHandler(final String version, final ConfigRepository configRepository, - final JobPersistence persistence, + final JobPersistence jobPersistence, final FileTtlManager fileTtlManager) { + this( + version, + configRepository, + fileTtlManager, + new ConfigDumpExporter(configRepository, jobPersistence), + new ConfigDumpImporter(configRepository, jobPersistence), + TrackingClientSingleton.get()); + } + + @VisibleForTesting + ArchiveHandler(final String version, + final ConfigRepository configRepository, + final FileTtlManager fileTtlManager, + final ConfigDumpExporter configDumpExporter, + final ConfigDumpImporter configDumpImporter, + final TrackingClient trackingClient) { this.version = version; this.configRepository = configRepository; - configFileArchiver = new ConfigFileArchiver(configRepository); - databaseArchiver = new DatabaseArchiver(persistence); + this.configDumpExporter = configDumpExporter; + this.configDumpImporter = configDumpImporter; this.fileTtlManager = fileTtlManager; + this.trackingClient = trackingClient; } /** @@ -77,32 +89,9 @@ public ArchiveHandler(final String version, * @return that tarball File. */ public File exportData() { - try { - final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), ARCHIVE_FILE_NAME); - final File archive = Files.createTempFile(ARCHIVE_FILE_NAME, ".tar.gz").toFile(); - fileTtlManager.register(archive.toPath()); - try { - exportVersionFile(tempFolder); - configFileArchiver.exportConfigsToArchive(tempFolder); - databaseArchiver.exportDatabaseToArchive(tempFolder); - Archives.createArchive(tempFolder, archive.toPath()); - } catch (Exception e) { - LOGGER.error("Export Data failed."); - FileUtils.deleteQuietly(archive); - throw new RuntimeException(e); - } finally { - FileUtils.deleteDirectory(tempFolder.toFile()); - } - return archive; - } catch (IOException e) { - LOGGER.error("Export Data failed."); - throw new RuntimeException(e); - } - } - - private void exportVersionFile(Path tempFolder) throws IOException { - final File versionFile = Files.createFile(tempFolder.resolve(VERSION_FILE_NAME)).toFile(); - FileUtils.writeStringToFile(versionFile, version, Charset.defaultCharset()); + final File archive = configDumpExporter.dump(); + fileTtlManager.register(archive.toPath()); + return archive; } /** @@ -119,40 +108,23 @@ public ImportRead importData(File archive) { try { final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive"); try { - Archives.extractArchive(archive.toPath(), tempFolder); - checkImport(tempFolder); - databaseArchiver.importDatabaseFromArchive(tempFolder, version); - configFileArchiver.importConfigsFromArchive(tempFolder, false); + configDumpImporter.importData(version, archive); result = new ImportRead().status(StatusEnum.SUCCEEDED); } finally { FileUtils.deleteDirectory(tempFolder.toFile()); FileUtils.deleteQuietly(archive); } - - // identify this instance as the new customer id. - TrackingClientSingleton.get().identify(); - // report that the previous customer id is now superseded by the imported one. - previousCustomerIdOptional.ifPresent(previousCustomerId -> TrackingClientSingleton.get().alias(previousCustomerId.toString())); - } catch (IOException | JsonValidationException | ConfigNotFoundException | RuntimeException e) { + } catch (Exception e) { LOGGER.error("Import failed", e); result = new ImportRead().status(StatusEnum.FAILED).reason(e.getMessage()); } - return result; - } + // identify this instance as the new customer id. + trackingClient.identify(); + // report that the previous customer id is now superseded by the imported one. + previousCustomerIdOptional.ifPresent(previousCustomerId -> trackingClient.alias(previousCustomerId.toString())); - private void checkImport(Path tempFolder) throws IOException, JsonValidationException, ConfigNotFoundException { - final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME); - final String importVersion = Files.readString(versionFile, Charset.defaultCharset()).replace("\n", "").strip(); - LOGGER.info(String.format("Checking Airbyte Version to import %s", importVersion)); - if (!AirbyteVersion.isCompatible(version, importVersion)) { - throw new IOException(String.format("Imported VERSION (%s) is incompatible with current Airbyte version (%s).\n" + - "Please upgrade your Airbyte Archive, see more at https://docs.airbyte.io/tutorials/upgrading-airbyte\n", - importVersion, version)); - } - databaseArchiver.checkVersion(version); - // Check if all files to import are valid and with expected airbyte version - configFileArchiver.importConfigsFromArchive(tempFolder, true); + return result; } private Optional getCurrentCustomerId() { diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigFileArchiverTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigFileArchiverTest.java deleted file mode 100644 index d026d713a540..000000000000 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigFileArchiverTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.server.converters; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.airbyte.config.ConfigSchema; -import io.airbyte.config.DestinationConnection; -import io.airbyte.config.Notification; -import io.airbyte.config.Notification.NotificationType; -import io.airbyte.config.SlackNotificationConfiguration; -import io.airbyte.config.SourceConnection; -import io.airbyte.config.StandardDestinationDefinition; -import io.airbyte.config.StandardSourceDefinition; -import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardWorkspace; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.PersistenceConstants; -import io.airbyte.server.helpers.ConnectionHelpers; -import io.airbyte.server.helpers.DestinationDefinitionHelpers; -import io.airbyte.server.helpers.DestinationHelpers; -import io.airbyte.server.helpers.SourceDefinitionHelpers; -import io.airbyte.server.helpers.SourceHelpers; -import io.airbyte.validation.json.JsonValidationException; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; -import java.util.UUID; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class ConfigFileArchiverTest { - - private ConfigRepository configRepository; - private ConfigFileArchiver configFileArchiver; - - @BeforeEach - void setUp() { - configRepository = mock(ConfigRepository.class); - configFileArchiver = new ConfigFileArchiver(configRepository); - } - - private StandardWorkspace generateWorkspace() { - final UUID workspaceId = PersistenceConstants.DEFAULT_WORKSPACE_ID; - - return new StandardWorkspace() - .withWorkspaceId(workspaceId) - .withCustomerId(UUID.randomUUID()) - .withEmail("test@airbyte.io") - .withName("test workspace") - .withSlug("default") - .withInitialSetupComplete(false) - .withTombstone(false) - .withNotifications(List.of(new Notification() - .withNotificationType(NotificationType.SLACK) - .withSlackConfiguration(new SlackNotificationConfiguration() - .withWebhook("http://airbyte.notifications")))); - } - - @Test - void testConfigExportImport() throws ConfigNotFoundException, IOException, JsonValidationException { - final StandardWorkspace workspace = generateWorkspace(); - final StandardSourceDefinition standardSource = SourceDefinitionHelpers.generateSource(); - final StandardDestinationDefinition standardDestination = DestinationDefinitionHelpers.generateDestination(); - final SourceConnection sourceConnection1 = SourceHelpers.generateSource(standardSource.getSourceDefinitionId()); - final SourceConnection sourceConnection2 = SourceHelpers.generateSource(standardSource.getSourceDefinitionId()); - final DestinationConnection destinationConnection = DestinationHelpers.generateDestination(standardDestination.getDestinationDefinitionId()); - final StandardSync destinationSync = ConnectionHelpers.generateSyncWithDestinationId(destinationConnection.getDestinationId()); - final StandardSync sourceSync = ConnectionHelpers.generateSyncWithSourceId(sourceConnection1.getSourceId()); - - // Read operations - when(configRepository.listStandardWorkspaces(true)).thenReturn(List.of(workspace)); - when(configRepository.listStandardSources()).thenReturn(List.of(standardSource)); - when(configRepository.listStandardDestinationDefinitions()).thenReturn(List.of(standardDestination)); - when(configRepository.listSourceConnection()).thenReturn(List.of(sourceConnection1, sourceConnection2)); - when(configRepository.listDestinationConnection()).thenReturn(List.of(destinationConnection)); - when(configRepository.listStandardSyncs()).thenReturn(List.of(destinationSync, sourceSync)); - when(configRepository.getStandardSync(sourceSync.getConnectionId())).thenReturn(sourceSync); - when(configRepository.getStandardSync(destinationSync.getConnectionId())).thenReturn(destinationSync); - - final Path tempFolder = Files.createTempDirectory("testConfigMigration"); - configFileArchiver.exportConfigsToArchive(tempFolder); - configFileArchiver.importConfigsFromArchive(tempFolder, false); - - verify(configRepository).writeStandardWorkspace(workspace); - verify(configRepository).writeStandardSource(standardSource); - verify(configRepository).writeStandardDestinationDefinition(standardDestination); - verify(configRepository).writeSourceConnection(sourceConnection1); - verify(configRepository).writeSourceConnection(sourceConnection2); - verify(configRepository).writeDestinationConnection(destinationConnection); - verify(configRepository).writeStandardSync(sourceSync); - verify(configRepository).writeStandardSync(destinationSync); - } - - @Test - void testPartialConfigImport() throws ConfigNotFoundException, IOException, JsonValidationException { - final StandardWorkspace workspace = generateWorkspace(); - final StandardSourceDefinition standardSource = SourceDefinitionHelpers.generateSource(); - final StandardDestinationDefinition standardDestination = DestinationDefinitionHelpers.generateDestination(); - final SourceConnection sourceConnection1 = SourceHelpers.generateSource(standardSource.getSourceDefinitionId()); - final SourceConnection sourceConnection2 = SourceHelpers.generateSource(standardSource.getSourceDefinitionId()); - final DestinationConnection destinationConnection = DestinationHelpers.generateDestination(standardDestination.getDestinationDefinitionId()); - final StandardSync destinationSync = ConnectionHelpers.generateSyncWithDestinationId(destinationConnection.getDestinationId()); - final StandardSync sourceSync = ConnectionHelpers.generateSyncWithSourceId(sourceConnection1.getSourceId()); - - // Read operations - when(configRepository.listStandardWorkspaces(true)).thenReturn(List.of(workspace)); - when(configRepository.listStandardSources()).thenReturn(List.of(standardSource)); - when(configRepository.listStandardDestinationDefinitions()).thenReturn(List.of(standardDestination)); - when(configRepository.listSourceConnection()).thenReturn(List.of(sourceConnection1, sourceConnection2)); - when(configRepository.listDestinationConnection()).thenReturn(List.of(destinationConnection)); - when(configRepository.listStandardSyncs()).thenReturn(List.of(destinationSync, sourceSync)); - when(configRepository.getStandardSync(sourceSync.getConnectionId())).thenReturn(sourceSync); - - final Path tempFolder = Files.createTempDirectory("testConfigMigration"); - configFileArchiver.exportConfigsToArchive(tempFolder); - Files.delete(ConfigFileArchiver.buildConfigPath(tempFolder, ConfigSchema.STANDARD_DESTINATION_DEFINITION)); - assertThrows(RuntimeException.class, () -> configFileArchiver.importConfigsFromArchive(tempFolder, false)); - } - -} diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java index f51798ffe9b6..3722d8b4ca3a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java @@ -24,40 +24,84 @@ package io.airbyte.server.handlers; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.airbyte.analytics.TrackingClient; +import io.airbyte.api.model.ImportRead; +import io.airbyte.api.model.ImportRead.StatusEnum; import io.airbyte.commons.io.FileTtlManager; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.config.persistence.PersistenceConstants; +import io.airbyte.server.ConfigDumpExporter; +import io.airbyte.server.ConfigDumpImporter; import io.airbyte.validation.json.JsonValidationException; +import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class ArchiveHandlerTest { + private static final String VERSION = "test-version"; + private ConfigRepository configRepository; private ArchiveHandler archiveHandler; + private FileTtlManager fileTtlManager; + private ConfigDumpExporter configDumpExporter; + private ConfigDumpImporter configDumpImporter; + private TrackingClient trackingClient; @BeforeEach void setUp() { configRepository = mock(ConfigRepository.class); - archiveHandler = new ArchiveHandler("test-version", configRepository, mock(JobPersistence.class), mock(FileTtlManager.class)); + fileTtlManager = mock(FileTtlManager.class); + configDumpExporter = mock(ConfigDumpExporter.class); + configDumpImporter = mock(ConfigDumpImporter.class); + trackingClient = mock(TrackingClient.class); + archiveHandler = new ArchiveHandler( + VERSION, + configRepository, + fileTtlManager, + configDumpExporter, + configDumpImporter, + trackingClient); } @Test - void testEmptyMigration() throws JsonValidationException, IOException { - archiveHandler.importData(archiveHandler.exportData()); - verify(configRepository, never()).writeStandardWorkspace(any()); - verify(configRepository, never()).writeStandardSource(any()); - verify(configRepository, never()).writeStandardDestinationDefinition(any()); - verify(configRepository, never()).writeSourceConnection(any()); - verify(configRepository, never()).writeDestinationConnection(any()); - verify(configRepository, never()).writeStandardSync(any()); - verify(configRepository, never()).writeStandardSyncOperation(any()); + void testExport() throws IOException { + final File file = Files.createTempFile(Path.of("/tmp"), "dump_file", "dump_file").toFile(); + when(configDumpExporter.dump()).thenReturn(file); + + assertEquals(file, archiveHandler.exportData()); + + verify(configDumpExporter).dump(); + verify(fileTtlManager).register(file.toPath()); + } + + @Test + void testImport() throws IOException, JsonValidationException, ConfigNotFoundException { + final File file = Files.createTempFile(Path.of("/tmp"), "dump_file", "dump_file").toFile(); + final UUID customerId = UUID.randomUUID(); + when(configRepository.getStandardWorkspace(PersistenceConstants.DEFAULT_WORKSPACE_ID, true)) + .thenReturn(new StandardWorkspace().withCustomerId(customerId)); + + assertEquals(new ImportRead().status(StatusEnum.SUCCEEDED), archiveHandler.importData(file)); + + // make sure it cleans up the file. + assertFalse(Files.exists(file.toPath())); + + verify(trackingClient).identify(); + verify(trackingClient).alias(customerId.toString()); + verify(configDumpImporter).importData(VERSION, file); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/DatabaseArchiver.java b/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiver.java similarity index 97% rename from airbyte-server/src/main/java/io/airbyte/server/converters/DatabaseArchiver.java rename to airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiver.java index da2f22a910d6..80b563e36dd4 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/DatabaseArchiver.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiver.java @@ -22,7 +22,7 @@ * SOFTWARE. */ -package io.airbyte.server.converters; +package io.airbyte.server.migration; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.io.IOs; @@ -48,6 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +// todo (cgardens) - kill this class. only kept around because it is hard to extricate from the +// {@link RunMigrationTest} test. public class DatabaseArchiver { private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseArchiver.class); diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/DatabaseArchiverTest.java b/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java similarity index 99% rename from airbyte-server/src/test/java/io/airbyte/server/converters/DatabaseArchiverTest.java rename to airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java index d0b5a5242723..10a25b3cd05b 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/DatabaseArchiverTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java @@ -22,7 +22,7 @@ * SOFTWARE. */ -package io.airbyte.server.converters; +package io.airbyte.server.migration; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; diff --git a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java index e64883d1ad41..c07448129b93 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java @@ -49,7 +49,6 @@ import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.server.RunMigration; -import io.airbyte.server.converters.DatabaseArchiver; import io.airbyte.validation.json.JsonValidationException; import java.io.File; import java.io.IOException; @@ -293,7 +292,6 @@ private void assertDestinations(ConfigRepository configRepository) throws JsonVa private void runMigration(JobPersistence jobPersistence, Path configRoot) throws Exception { try (RunMigration runMigration = new RunMigration( - INITIAL_VERSION, jobPersistence, new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot)), TARGET_VERSION,