Skip to content

Commit

Permalink
Wait for config volume to be ready (#4835)
Browse files Browse the repository at this point in the history
* Do not create config directory in fs persistence construction

* Run kube acceptance test only for testing purpose

* Wait for config volume to be ready

* Move config volume wait for fs persistence construction

* Restore ci workflow

* Prune imports
  • Loading branch information
tuliren authored Jul 20, 2021
1 parent 35806d9 commit e9a6702
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import java.io.IOException;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,14 +54,14 @@ public class ConfigPersistenceBuilder {
/**
* Create a db config persistence and setup the database, including table creation and data loading.
*/
public static ConfigPersistence getAndInitializeDbPersistence(Configs configs) throws IOException {
public static ConfigPersistence getAndInitializeDbPersistence(Configs configs) throws IOException, InterruptedException {
return new ConfigPersistenceBuilder(configs, true).create();
}

/**
* Create a db config persistence without setting up the database.
*/
public static ConfigPersistence getDbPersistence(Configs configs) throws IOException {
public static ConfigPersistence getDbPersistence(Configs configs) throws IOException, InterruptedException {
return new ConfigPersistenceBuilder(configs, false).create();
}

Expand All @@ -71,20 +70,14 @@ public static ConfigPersistence getDbPersistence(Configs configs) throws IOExcep
* database config persistence and copy the configs from the file-based config persistence.
* Otherwise, seed the database from the yaml files.
*/
ConfigPersistence create() throws IOException {
ConfigPersistence create() throws IOException, InterruptedException {
// Uncomment this branch in a future version when config volume is removed.
// if (configs.getConfigRoot() == null) {
// return getDbPersistenceWithYamlSeed();
// }
return getDbPersistenceWithFileSeed();
}

ConfigPersistence getFileSystemPersistence() throws IOException {
Path configRoot = configs.getConfigRoot();
LOGGER.info("Use file system config persistence (root: {})", configRoot);
return FileSystemConfigPersistence.createWithValidation(configRoot);
}

/**
* Create the database config persistence and load it with the initial seed from the YAML seed files
* if the database should be initialized.
Expand All @@ -99,10 +92,9 @@ ConfigPersistence getDbPersistenceWithYamlSeed() throws IOException {
* Create the database config persistence and load it with the existing configs from the file system
* config persistence if the database should be initialized.
*/
ConfigPersistence getDbPersistenceWithFileSeed() throws IOException {
ConfigPersistence getDbPersistenceWithFileSeed() throws IOException, InterruptedException {
LOGGER.info("Creating db-based config persistence, and loading seed and existing data from files");
Path configRoot = configs.getConfigRoot();
ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configRoot);
ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configs.getConfigRoot());
return getDbPersistence(fsConfigPersistence);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@
public class FileSystemConfigPersistence implements ConfigPersistence {

private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemConfigPersistence.class);
private static final String CONFIG_DIR = "config";
public static final String CONFIG_DIR = "config";
private static final String TMP_DIR = "tmp_storage";
private static final int INTERVAL_WAITING_SECONDS = 3;

private static final Object lock = new Object();

Expand All @@ -57,14 +58,24 @@ public class FileSystemConfigPersistence implements ConfigPersistence {
// root for where configs are stored
private final Path configRoot;

public static ConfigPersistence createWithValidation(final Path storageRoot) throws IOException {
public static ConfigPersistence createWithValidation(final Path storageRoot) throws InterruptedException {
LOGGER.info("Constructing file system config persistence (root: {})", storageRoot);

Path configRoot = storageRoot.resolve(CONFIG_DIR);
int totalWaitingSeconds = 0;
while (!Files.exists(configRoot)) {
LOGGER.warn("Config volume is not ready yet (waiting time: {} s)", totalWaitingSeconds);
Thread.sleep(INTERVAL_WAITING_SECONDS * 1000);
totalWaitingSeconds += INTERVAL_WAITING_SECONDS;
}
LOGGER.info("Config volume is ready (waiting time: {} s)", totalWaitingSeconds);

return new ValidatingConfigPersistence(new FileSystemConfigPersistence(storageRoot));
}

public FileSystemConfigPersistence(final Path storageRoot) throws IOException {
public FileSystemConfigPersistence(final Path storageRoot) {
this.storageRoot = storageRoot;
this.configRoot = storageRoot.resolve(CONFIG_DIR);
Files.createDirectories(configRoot);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,6 @@ public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception {
dbPersistence.dumpConfigs());
}

@Test
public void testCreateFileSystemConfigPersistence() throws Exception {
Path testRoot = Path.of("/tmp/cpf_test_file_system");
Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName());
ConfigPersistence seedPersistence = new FileSystemConfigPersistence(rootPath);
writeSource(seedPersistence, SOURCE_GITHUB);
writeDestination(seedPersistence, DESTINATION_S3);

when(configs.getConfigRoot()).thenReturn(rootPath);

ConfigPersistence filePersistence = new ConfigPersistenceBuilder(configs, false).getFileSystemPersistence();
assertSameConfigDump(seedPersistence.dumpConfigs(), filePersistence.dumpConfigs());
}

/**
* This test mimics the file -> db config persistence migration process.
*/
Expand All @@ -187,10 +173,11 @@ public void testMigrateFromFileToDbPersistence() throws Exception {

// first run uses file system config persistence, and adds an extra workspace
Path testRoot = Path.of("/tmp/cpf_test_migration");
Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName());
when(configs.getConfigRoot()).thenReturn(rootPath);
Path storageRoot = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName());
Files.createDirectories(storageRoot.resolve(FileSystemConfigPersistence.CONFIG_DIR));
when(configs.getConfigRoot()).thenReturn(storageRoot);

ConfigPersistence filePersistence = new ConfigPersistenceBuilder(configs, false).getFileSystemPersistence();
ConfigPersistence filePersistence = FileSystemConfigPersistence.createWithValidation(storageRoot);

filePersistence.replaceAllConfigs(seedConfigs, false);
filePersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, extraWorkspace.getWorkspaceId().toString(), extraWorkspace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testRunMigration() {
}
}

private void assertPreMigrationConfigs(Path configRoot, JobPersistence jobPersistence) throws IOException, JsonValidationException {
private void assertPreMigrationConfigs(Path configRoot, JobPersistence jobPersistence) throws Exception {
assertDatabaseVersion(jobPersistence, INITIAL_VERSION);
ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot));
Map<String, StandardSourceDefinition> sourceDefinitionsBeforeMigration = configRepository.listStandardSources().stream()
Expand All @@ -132,7 +132,7 @@ private void assertDatabaseVersion(JobPersistence jobPersistence, String version
assertEquals(versionFromDb.get(), version);
}

private void assertPostMigrationConfigs(Path importRoot) throws IOException, JsonValidationException, ConfigNotFoundException {
private void assertPostMigrationConfigs(Path importRoot) throws Exception {
final ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(importRoot));
final StandardSyncOperation standardSyncOperation = assertSyncOperations(configRepository);
assertStandardSyncs(configRepository, standardSyncOperation);
Expand Down Expand Up @@ -293,7 +293,7 @@ private void assertDestinations(ConfigRepository configRepository) throws JsonVa
}
}

private void runMigration(JobPersistence jobPersistence, Path configRoot) throws IOException {
private void runMigration(JobPersistence jobPersistence, Path configRoot) throws Exception {
try (RunMigration runMigration = new RunMigration(
INITIAL_VERSION,
jobPersistence,
Expand Down

0 comments on commit e9a6702

Please sign in to comment.