Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor import / export endpoints to use the same code path as auto … #4976

Merged
merged 2 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.ConfigSchema;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -75,7 +76,7 @@ public static ConfigPersistence createWithValidation(final Path storageRoot) thr

public FileSystemConfigPersistence(final Path storageRoot) {
this.storageRoot = storageRoot;
this.configRoot = storageRoot.resolve(CONFIG_DIR);
this.configRoot = Exceptions.toRuntime(() -> Files.createDirectories(storageRoot.resolve(CONFIG_DIR)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgardens, this is only needed for testing, right?

In production the config directory should not be created by this constructor. This is because when the server launches, it is possible that the config volume is not ready yet. It should wait for the config volume, instead of creating this directory itself. We have this logic in the createWithValidation method.

This is the PR with more details: #4835.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. make sense. i'll revert this part out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it here in this PR: #4987

}

@Override
Expand Down
3 changes: 2 additions & 1 deletion airbyte-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ task copySeed(type: Copy, dependsOn: [project(':airbyte-config:init').processRes
}

// need to make sure that the files are in the resource directory before copying.
//project.tasks.copySeed.mustRunAfter(project(':airbyte-config:init').tasks.processResources)
// tests require the seed to exist.
test.dependsOn(project.tasks.copySeed)
assemble.dependsOn(project.tasks.copySeed)

mainClassName = 'io.airbyte.server.ServerApp'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,18 @@
* would fail. 2. Unlike ArchiveHandler, this doesn't take the dump of specific files but looks at
* the config directory and takes the full dump of whatever is available
*/
public class ConfigDumpExport {
public class ConfigDumpExporter {

private static final String ARCHIVE_FILE_NAME = "airbyte_config_dump";
private static final String CONFIG_FOLDER_NAME = "airbyte_config";
private static final String DB_FOLDER_NAME = "airbyte_db";
private static final String VERSION_FILE_NAME = "VERSION";
private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final String version;

public ConfigDumpExport(ConfigRepository configRepository, JobPersistence jobPersistence, String version) {
public ConfigDumpExporter(ConfigRepository configRepository, JobPersistence jobPersistence) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.version = version;
}

public File dump() {
Expand All @@ -89,6 +87,7 @@ public File dump() {
}

private void exportVersionFile(Path tempFolder) throws IOException {
final String version = jobPersistence.getVersion().orElseThrow();
final File versionFile = Files.createFile(tempFolder.resolve(VERSION_FILE_NAME)).toFile();
FileUtils.writeStringToFile(versionFile, version, Charset.defaultCharset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.api.model.ImportRead;
import io.airbyte.api.model.ImportRead.StatusEnum;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.Archives;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -71,28 +73,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigDumpImport {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class ConfigDumpImporter {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDumpImport.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDumpImporter.class);
private static final String CONFIG_FOLDER_NAME = "airbyte_config";
private static final String DB_FOLDER_NAME = "airbyte_db";
private static final String VERSION_FILE_NAME = "VERSION";
private final ConfigRepository configRepository;
private final JsonSchemaValidator jsonSchemaValidator;
private final JobPersistence postgresPersistence;
private final String targetVersion;
private final String initialVersion;
private final Path latestSeed;

public ConfigDumpImport(String initialVersion,
String targetVersion,
Path latestSeed,
JobPersistence postgresPersistence,
ConfigRepository configRepository) {
this.targetVersion = targetVersion;
this.initialVersion = initialVersion;
this.latestSeed = latestSeed;
this.jsonSchemaValidator = new JsonSchemaValidator();

public ConfigDumpImporter(ConfigRepository configRepository, JobPersistence postgresPersistence) {
this(configRepository, postgresPersistence, new JsonSchemaValidator());
}

@VisibleForTesting
public ConfigDumpImporter(ConfigRepository configRepository, JobPersistence postgresPersistence, JsonSchemaValidator jsonSchemaValidator) {
this.jsonSchemaValidator = jsonSchemaValidator;
this.postgresPersistence = postgresPersistence;
this.configRepository = configRepository;
}
Expand All @@ -109,7 +107,17 @@ public Optional<UUID> getCurrentCustomerId() {
}
}

public ImportRead importData(File archive) {
public ImportRead importData(String targetVersion, File archive) {
return importDataInternal(targetVersion, archive, Optional.empty());
}

public ImportRead importDataWithSeed(String targetVersion, File archive, Path seedPath) {
return importDataInternal(targetVersion, archive, Optional.of(seedPath));
}

// seedPath - if present, merge with the import. otherwise just use the data in the import.
private ImportRead importDataInternal(String targetVersion, File archive, Optional<Path> seedPath) {
Preconditions.checkNotNull(seedPath);

final Optional<UUID> previousCustomerIdOptional = getCurrentCustomerId();
ImportRead result;
Expand All @@ -121,18 +129,17 @@ public ImportRead importData(File archive) {

// 2. dry run
try {
checkImport(sourceRoot);
checkImport(targetVersion, sourceRoot, seedPath);
} catch (Exception e) {
LOGGER.warn("Dry run failed, setting DB version back to initial version");
postgresPersistence.setVersion(initialVersion);
LOGGER.error("Dry run failed.", e);
throw e;
}

// 3. Import Postgres content
importDatabaseFromArchive(sourceRoot, targetVersion);

// 4. Import Configs
importConfigsFromArchive(sourceRoot, false);
importConfigsFromArchive(sourceRoot, seedPath, false);

// 5. Set DB version
LOGGER.info("Setting the DB Airbyte version to : " + targetVersion);
Expand All @@ -158,7 +165,7 @@ public ImportRead importData(File archive) {
return result;
}

private void checkImport(Path tempFolder) throws IOException, JsonValidationException {
private void checkImport(String targetVersion, Path tempFolder, Optional<Path> seed) throws IOException, JsonValidationException {
final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME);
final String importVersion = Files.readString(versionFile, Charset.defaultCharset())
.replace("\n", "").strip();
Expand All @@ -169,7 +176,7 @@ private void checkImport(Path tempFolder) throws IOException, JsonValidationExce
"Please upgrade your Airbyte Archive, see more at https://docs.airbyte.io/tutorials/upgrading-airbyte\n",
importVersion, targetVersion));
}
importConfigsFromArchive(tempFolder, true);
importConfigsFromArchive(tempFolder, seed, true);
}

// Config
Expand All @@ -180,39 +187,58 @@ private List<String> listDirectories(Path sourceRoot) throws IOException {
}
}

private <T> void importConfigsFromArchive(final Path sourceRoot, final boolean dryRun) throws IOException, JsonValidationException {
List<String> sourceDefinitionsToMigrate = new ArrayList<>();
List<String> destinationDefinitionsToMigrate = new ArrayList<>();
private <T> void importConfigsFromArchive(final Path sourceRoot, Optional<Path> seedPath, final boolean dryRun)
throws IOException, JsonValidationException {
final List<String> sourceDefinitionsToMigrate = new ArrayList<>();
final List<String> destinationDefinitionsToMigrate = new ArrayList<>();
final boolean[] sourceProcessed = {false};
final boolean[] destinationProcessed = {false};
List<String> directories = listDirectories(sourceRoot);
final List<String> directories = listDirectories(sourceRoot);
// We sort the directories cause we want to process SOURCE_CONNECTION before
// STANDARD_SOURCE_DEFINITION and DESTINATION_CONNECTION before STANDARD_DESTINATION_DEFINITION
// so that we can identify which definitions should not be upgraded to the latest version
Collections.sort(directories);
Map<ConfigSchema, Stream<T>> data = new LinkedHashMap<>();
Map<ConfigSchema, Map<String, T>> latestSeeds = latestSeeds();
final Map<ConfigSchema, Stream<T>> data = new LinkedHashMap<>();

final Map<ConfigSchema, Map<String, T>> seed;
if (seedPath.isPresent()) {
seed = getSeed(seedPath.get());
} else {
seed = new HashMap<>();
}
for (String directory : directories) {
ConfigSchema configSchema = ConfigSchema.valueOf(directory.replace(".yaml", ""));
final Optional<ConfigSchema> configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class);

if (configSchemaOptional.isEmpty()) {
continue;
}

final ConfigSchema configSchema = configSchemaOptional.get();
Stream<T> configs = readConfigsFromArchive(sourceRoot, configSchema);
configs = streamWithAdditionalOperation(sourceDefinitionsToMigrate, destinationDefinitionsToMigrate, sourceProcessed, destinationProcessed,
configSchema, configs, latestSeeds);
configs = streamWithAdditionalOperation(
sourceDefinitionsToMigrate,
destinationDefinitionsToMigrate,
sourceProcessed,
destinationProcessed,
configSchema,
configs,
seed);
data.put(configSchema, configs);
}
configRepository.replaceAllConfigs(data, dryRun);
}

private <T> Map<ConfigSchema, Map<String, T>> latestSeeds() throws IOException {
List<ConfigSchema> configSchemas = Files.list(latestSeed).map(c -> ConfigSchema.valueOf(c.getFileName().toString())).collect(Collectors.toList());
Map<ConfigSchema, Map<String, T>> allData = new HashMap<>();
private <T> Map<ConfigSchema, Map<String, T>> getSeed(Path seed) throws IOException {
final List<ConfigSchema> configSchemas = Files.list(seed).map(c -> ConfigSchema.valueOf(c.getFileName().toString())).collect(Collectors.toList());
final Map<ConfigSchema, Map<String, T>> allData = new HashMap<>();
for (ConfigSchema configSchema : configSchemas) {
Map<String, T> data = readLatestSeed(configSchema);
final Map<String, T> data = readLatestSeed(seed, configSchema);
allData.put(configSchema, data);
}
return allData;
}

private <T> Map<String, T> readLatestSeed(ConfigSchema configSchema) throws IOException {
private <T> Map<String, T> readLatestSeed(Path latestSeed, ConfigSchema configSchema) throws IOException {
try (Stream<Path> files = Files.list(latestSeed.resolve(configSchema.toString()))) {
final List<String> ids = files
.filter(p -> !p.endsWith(".json"))
Expand Down Expand Up @@ -274,7 +300,7 @@ private <T> Stream<T> getDefinitionStream(List<String> definitionsToMigrate,
}

return Streams.concat(configs.filter(c -> definitionsToMigrate.contains(configSchema.getId(c))),
latestSeeds.get(configSchema).entrySet().stream().filter(c -> !definitionsToMigrate.contains(c.getKey()))
latestSeeds.getOrDefault(configSchema, new HashMap<>()).entrySet().stream().filter(c -> !definitionsToMigrate.contains(c.getKey()))
.map(Entry::getValue));
}

Expand Down Expand Up @@ -314,8 +340,7 @@ protected static Path buildConfigPath(final Path storageRoot, final ConfigSchema
}

// Postgres Portion
public void importDatabaseFromArchive(final Path storageRoot, final String airbyteVersion)
throws IOException {
public void importDatabaseFromArchive(final Path storageRoot, final String airbyteVersion) throws IOException {
try {
final Map<DatabaseSchema, Stream<JsonNode>> data = new HashMap<>();
for (DatabaseSchema tableType : DatabaseSchema.values()) {
Expand All @@ -331,8 +356,7 @@ public void importDatabaseFromArchive(final Path storageRoot, final String airby
postgresPersistence.importDatabase(airbyteVersion, data);
LOGGER.info("Successful upgrade of airbyte postgres database from archive");
} catch (Exception e) {
LOGGER.warn("Postgres database version upgrade failed, setting DB version back to initial version");
postgresPersistence.setVersion(initialVersion);
LOGGER.warn("Postgres database version upgrade failed, reverting to state previous to migration.");
throw e;
}
}
Expand Down
25 changes: 12 additions & 13 deletions airbyte-server/src/main/java/io/airbyte/server/RunMigration.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,41 +44,40 @@ public class RunMigration implements Runnable, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(RunMigration.class);
private final String targetVersion;
private final ConfigDumpExport configDumpExport;
private final ConfigDumpImport configDumpImport;
private final Path seedPath;
private final ConfigDumpExporter configDumpExporter;
private final ConfigDumpImporter configDumpImporter;
private final List<File> filesToBeCleanedUp = new ArrayList<>();

public RunMigration(String initialVersion,
JobPersistence jobPersistence,
public RunMigration(JobPersistence jobPersistence,
ConfigRepository configRepository,
String targetVersion,
Path latestSeeds) {
Path seedPath) {
this.targetVersion = targetVersion;
this.configDumpExport = new ConfigDumpExport(configRepository, jobPersistence, initialVersion);
this.configDumpImport = new ConfigDumpImport(initialVersion, targetVersion, latestSeeds, jobPersistence, configRepository);
this.seedPath = seedPath;
this.configDumpExporter = new ConfigDumpExporter(configRepository, jobPersistence);
this.configDumpImporter = new ConfigDumpImporter(configRepository, jobPersistence);
}

@Override
public void run() {
try {
// Export data
File exportData = configDumpExport.dump();
File exportData = configDumpExporter.dump();
filesToBeCleanedUp.add(exportData);

// Define output target
final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive_output");
final File output = Files.createTempFile(tempFolder, "airbyte_archive_output", ".tar.gz")
.toFile();
final File output = Files.createTempFile(tempFolder, "airbyte_archive_output", ".tar.gz").toFile();
filesToBeCleanedUp.add(output);
filesToBeCleanedUp.add(tempFolder.toFile());

// Run Migration
MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(),
targetVersion);
MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(), targetVersion);
MigrationRunner.run(migrateConfig);

// Import data
ImportRead importRead = configDumpImport.importData(output);
ImportRead importRead = configDumpImporter.importDataWithSeed(targetVersion, output, seedPath);
if (importRead.getStatus() == StatusEnum.FAILED) {
throw new RuntimeException("Automatic migration failed : " + importRead.getReason());
}
Expand Down
7 changes: 5 additions & 2 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,11 @@ private static void runAutomaticMigration(ConfigRepository configRepository,
LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion);
final Path latestSeedsPath = Path.of(System.getProperty("user.dir")).resolve("latest_seeds");
LOGGER.info("Last seeds dir: {}", latestSeedsPath);
try (RunMigration runMigration = new RunMigration(airbyteDatabaseVersion,
jobPersistence, configRepository, airbyteVersion, latestSeedsPath)) {
try (final RunMigration runMigration = new RunMigration(
jobPersistence,
configRepository,
airbyteVersion,
latestSeedsPath)) {
runMigration.run();
} catch (Exception e) {
LOGGER.error("Automatic Migration failed ", e);
Expand Down
Loading