Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for config volume to be ready #4835

Merged
merged 6 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import java.io.IOException;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,14 +54,14 @@ public class ConfigPersistenceBuilder {
/**
* Create a db config persistence and setup the database, including table creation and data loading.
*/
public static ConfigPersistence getAndInitializeDbPersistence(Configs configs) throws IOException {
public static ConfigPersistence getAndInitializeDbPersistence(Configs configs) throws IOException, InterruptedException {
return new ConfigPersistenceBuilder(configs, true).create();
}

/**
* Create a db config persistence without setting up the database.
*/
public static ConfigPersistence getDbPersistence(Configs configs) throws IOException {
public static ConfigPersistence getDbPersistence(Configs configs) throws IOException, InterruptedException {
return new ConfigPersistenceBuilder(configs, false).create();
}

Expand All @@ -71,20 +70,14 @@ public static ConfigPersistence getDbPersistence(Configs configs) throws IOExcep
* database config persistence and copy the configs from the file-based config persistence.
* Otherwise, seed the database from the yaml files.
*/
ConfigPersistence create() throws IOException {
ConfigPersistence create() throws IOException, InterruptedException {
// Uncomment this branch in a future version when config volume is removed.
// if (configs.getConfigRoot() == null) {
// return getDbPersistenceWithYamlSeed();
// }
return getDbPersistenceWithFileSeed();
}

ConfigPersistence getFileSystemPersistence() throws IOException {
Path configRoot = configs.getConfigRoot();
LOGGER.info("Use file system config persistence (root: {})", configRoot);
return FileSystemConfigPersistence.createWithValidation(configRoot);
}

/**
* Create the database config persistence and load it with the initial seed from the YAML seed files
* if the database should be initialized.
Expand All @@ -99,10 +92,9 @@ ConfigPersistence getDbPersistenceWithYamlSeed() throws IOException {
* Create the database config persistence and load it with the existing configs from the file system
* config persistence if the database should be initialized.
*/
ConfigPersistence getDbPersistenceWithFileSeed() throws IOException {
ConfigPersistence getDbPersistenceWithFileSeed() throws IOException, InterruptedException {
LOGGER.info("Creating db-based config persistence, and loading seed and existing data from files");
Path configRoot = configs.getConfigRoot();
ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configRoot);
ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configs.getConfigRoot());
return getDbPersistence(fsConfigPersistence);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@
public class FileSystemConfigPersistence implements ConfigPersistence {

private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemConfigPersistence.class);
private static final String CONFIG_DIR = "config";
public static final String CONFIG_DIR = "config";
private static final String TMP_DIR = "tmp_storage";
private static final int INTERVAL_WAITING_SECONDS = 3;

private static final Object lock = new Object();

Expand All @@ -57,14 +58,24 @@ public class FileSystemConfigPersistence implements ConfigPersistence {
// root for where configs are stored
private final Path configRoot;

public static ConfigPersistence createWithValidation(final Path storageRoot) throws IOException {
public static ConfigPersistence createWithValidation(final Path storageRoot) throws InterruptedException {
LOGGER.info("Constructing file system config persistence (root: {})", storageRoot);

Path configRoot = storageRoot.resolve(CONFIG_DIR);
int totalWaitingSeconds = 0;
while (!Files.exists(configRoot)) {
LOGGER.warn("Config volume is not ready yet (waiting time: {} s)", totalWaitingSeconds);
Thread.sleep(INTERVAL_WAITING_SECONDS * 1000);
totalWaitingSeconds += INTERVAL_WAITING_SECONDS;
}
LOGGER.info("Config volume is ready (waiting time: {} s)", totalWaitingSeconds);

return new ValidatingConfigPersistence(new FileSystemConfigPersistence(storageRoot));
}

public FileSystemConfigPersistence(final Path storageRoot) throws IOException {
public FileSystemConfigPersistence(final Path storageRoot) {
this.storageRoot = storageRoot;
this.configRoot = storageRoot.resolve(CONFIG_DIR);
Files.createDirectories(configRoot);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added by me in #4670. But I should not have done that. This statement covered the corner case that sometimes the config volume is not ready when server app starts, at least in kube acceptance test.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,6 @@ public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception {
dbPersistence.dumpConfigs());
}

@Test
public void testCreateFileSystemConfigPersistence() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConfigPersistenceBuilder#getFileSystemPersistence method is redundant. So this method is deleted, together with this test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense!

Path testRoot = Path.of("/tmp/cpf_test_file_system");
Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName());
ConfigPersistence seedPersistence = new FileSystemConfigPersistence(rootPath);
writeSource(seedPersistence, SOURCE_GITHUB);
writeDestination(seedPersistence, DESTINATION_S3);

when(configs.getConfigRoot()).thenReturn(rootPath);

ConfigPersistence filePersistence = new ConfigPersistenceBuilder(configs, false).getFileSystemPersistence();
assertSameConfigDump(seedPersistence.dumpConfigs(), filePersistence.dumpConfigs());
}

/**
* This test mimics the file -> db config persistence migration process.
*/
Expand All @@ -187,10 +173,11 @@ public void testMigrateFromFileToDbPersistence() throws Exception {

// first run uses file system config persistence, and adds an extra workspace
Path testRoot = Path.of("/tmp/cpf_test_migration");
Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName());
when(configs.getConfigRoot()).thenReturn(rootPath);
Path storageRoot = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName());
Files.createDirectories(storageRoot.resolve(FileSystemConfigPersistence.CONFIG_DIR));
when(configs.getConfigRoot()).thenReturn(storageRoot);

ConfigPersistence filePersistence = new ConfigPersistenceBuilder(configs, false).getFileSystemPersistence();
ConfigPersistence filePersistence = FileSystemConfigPersistence.createWithValidation(storageRoot);

filePersistence.replaceAllConfigs(seedConfigs, false);
filePersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, extraWorkspace.getWorkspaceId().toString(), extraWorkspace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testRunMigration() {
}
}

private void assertPreMigrationConfigs(Path configRoot, JobPersistence jobPersistence) throws IOException, JsonValidationException {
private void assertPreMigrationConfigs(Path configRoot, JobPersistence jobPersistence) throws Exception {
assertDatabaseVersion(jobPersistence, INITIAL_VERSION);
ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot));
Map<String, StandardSourceDefinition> sourceDefinitionsBeforeMigration = configRepository.listStandardSources().stream()
Expand All @@ -132,7 +132,7 @@ private void assertDatabaseVersion(JobPersistence jobPersistence, String version
assertEquals(versionFromDb.get(), version);
}

private void assertPostMigrationConfigs(Path importRoot) throws IOException, JsonValidationException, ConfigNotFoundException {
private void assertPostMigrationConfigs(Path importRoot) throws Exception {
final ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(importRoot));
final StandardSyncOperation standardSyncOperation = assertSyncOperations(configRepository);
assertStandardSyncs(configRepository, standardSyncOperation);
Expand Down Expand Up @@ -293,7 +293,7 @@ private void assertDestinations(ConfigRepository configRepository) throws JsonVa
}
}

private void runMigration(JobPersistence jobPersistence, Path configRoot) throws IOException {
private void runMigration(JobPersistence jobPersistence, Path configRoot) throws Exception {
try (RunMigration runMigration = new RunMigration(
INITIAL_VERSION,
jobPersistence,
Expand Down