diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java index 266b5f8203df..1d5ddce40ab1 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java @@ -31,7 +31,6 @@ import io.airbyte.db.Database; import io.airbyte.db.Databases; import java.io.IOException; -import java.nio.file.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,14 +54,14 @@ public class ConfigPersistenceBuilder { /** * Create a db config persistence and setup the database, including table creation and data loading. */ - public static ConfigPersistence getAndInitializeDbPersistence(Configs configs) throws IOException { + public static ConfigPersistence getAndInitializeDbPersistence(Configs configs) throws IOException, InterruptedException { return new ConfigPersistenceBuilder(configs, true).create(); } /** * Create a db config persistence without setting up the database. */ - public static ConfigPersistence getDbPersistence(Configs configs) throws IOException { + public static ConfigPersistence getDbPersistence(Configs configs) throws IOException, InterruptedException { return new ConfigPersistenceBuilder(configs, false).create(); } @@ -71,7 +70,7 @@ public static ConfigPersistence getDbPersistence(Configs configs) throws IOExcep * database config persistence and copy the configs from the file-based config persistence. * Otherwise, seed the database from the yaml files. */ - ConfigPersistence create() throws IOException { + ConfigPersistence create() throws IOException, InterruptedException { // Uncomment this branch in a future version when config volume is removed. // if (configs.getConfigRoot() == null) { // return getDbPersistenceWithYamlSeed(); @@ -79,12 +78,6 @@ ConfigPersistence create() throws IOException { return getDbPersistenceWithFileSeed(); } - ConfigPersistence getFileSystemPersistence() throws IOException { - Path configRoot = configs.getConfigRoot(); - LOGGER.info("Use file system config persistence (root: {})", configRoot); - return FileSystemConfigPersistence.createWithValidation(configRoot); - } - /** * Create the database config persistence and load it with the initial seed from the YAML seed files * if the database should be initialized. @@ -99,10 +92,9 @@ ConfigPersistence getDbPersistenceWithYamlSeed() throws IOException { * Create the database config persistence and load it with the existing configs from the file system * config persistence if the database should be initialized. */ - ConfigPersistence getDbPersistenceWithFileSeed() throws IOException { + ConfigPersistence getDbPersistenceWithFileSeed() throws IOException, InterruptedException { LOGGER.info("Creating db-based config persistence, and loading seed and existing data from files"); - Path configRoot = configs.getConfigRoot(); - ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configRoot); + ConfigPersistence fsConfigPersistence = FileSystemConfigPersistence.createWithValidation(configs.getConfigRoot()); return getDbPersistence(fsConfigPersistence); } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java index 4bc1cad3968a..2b8767143b98 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java @@ -47,8 +47,9 @@ public class FileSystemConfigPersistence implements ConfigPersistence { private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemConfigPersistence.class); - private static final String CONFIG_DIR = "config"; + public static final String CONFIG_DIR = "config"; private static final String TMP_DIR = "tmp_storage"; + private static final int INTERVAL_WAITING_SECONDS = 3; private static final Object lock = new Object(); @@ -57,14 +58,24 @@ public class FileSystemConfigPersistence implements ConfigPersistence { // root for where configs are stored private final Path configRoot; - public static ConfigPersistence createWithValidation(final Path storageRoot) throws IOException { + public static ConfigPersistence createWithValidation(final Path storageRoot) throws InterruptedException { + LOGGER.info("Constructing file system config persistence (root: {})", storageRoot); + + Path configRoot = storageRoot.resolve(CONFIG_DIR); + int totalWaitingSeconds = 0; + while (!Files.exists(configRoot)) { + LOGGER.warn("Config volume is not ready yet (waiting time: {} s)", totalWaitingSeconds); + Thread.sleep(INTERVAL_WAITING_SECONDS * 1000); + totalWaitingSeconds += INTERVAL_WAITING_SECONDS; + } + LOGGER.info("Config volume is ready (waiting time: {} s)", totalWaitingSeconds); + return new ValidatingConfigPersistence(new FileSystemConfigPersistence(storageRoot)); } - public FileSystemConfigPersistence(final Path storageRoot) throws IOException { + public FileSystemConfigPersistence(final Path storageRoot) { this.storageRoot = storageRoot; this.configRoot = storageRoot.resolve(CONFIG_DIR); - Files.createDirectories(configRoot); } @Override diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java index 40cd7f78a433..79189f899d89 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java @@ -155,20 +155,6 @@ public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception { dbPersistence.dumpConfigs()); } - @Test - public void testCreateFileSystemConfigPersistence() throws Exception { - Path testRoot = Path.of("/tmp/cpf_test_file_system"); - Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName()); - ConfigPersistence seedPersistence = new FileSystemConfigPersistence(rootPath); - writeSource(seedPersistence, SOURCE_GITHUB); - writeDestination(seedPersistence, DESTINATION_S3); - - when(configs.getConfigRoot()).thenReturn(rootPath); - - ConfigPersistence filePersistence = new ConfigPersistenceBuilder(configs, false).getFileSystemPersistence(); - assertSameConfigDump(seedPersistence.dumpConfigs(), filePersistence.dumpConfigs()); - } - /** * This test mimics the file -> db config persistence migration process. */ @@ -187,10 +173,11 @@ public void testMigrateFromFileToDbPersistence() throws Exception { // first run uses file system config persistence, and adds an extra workspace Path testRoot = Path.of("/tmp/cpf_test_migration"); - Path rootPath = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName()); - when(configs.getConfigRoot()).thenReturn(rootPath); + Path storageRoot = Files.createTempDirectory(Files.createDirectories(testRoot), ConfigPersistenceBuilderTest.class.getName()); + Files.createDirectories(storageRoot.resolve(FileSystemConfigPersistence.CONFIG_DIR)); + when(configs.getConfigRoot()).thenReturn(storageRoot); - ConfigPersistence filePersistence = new ConfigPersistenceBuilder(configs, false).getFileSystemPersistence(); + ConfigPersistence filePersistence = FileSystemConfigPersistence.createWithValidation(storageRoot); filePersistence.replaceAllConfigs(seedConfigs, false); filePersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, extraWorkspace.getWorkspaceId().toString(), extraWorkspace); diff --git a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java index 0d733b765aea..7aa6447d1bcf 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java @@ -117,7 +117,7 @@ public void testRunMigration() { } } - private void assertPreMigrationConfigs(Path configRoot, JobPersistence jobPersistence) throws IOException, JsonValidationException { + private void assertPreMigrationConfigs(Path configRoot, JobPersistence jobPersistence) throws Exception { assertDatabaseVersion(jobPersistence, INITIAL_VERSION); ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot)); Map sourceDefinitionsBeforeMigration = configRepository.listStandardSources().stream() @@ -132,7 +132,7 @@ private void assertDatabaseVersion(JobPersistence jobPersistence, String version assertEquals(versionFromDb.get(), version); } - private void assertPostMigrationConfigs(Path importRoot) throws IOException, JsonValidationException, ConfigNotFoundException { + private void assertPostMigrationConfigs(Path importRoot) throws Exception { final ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(importRoot)); final StandardSyncOperation standardSyncOperation = assertSyncOperations(configRepository); assertStandardSyncs(configRepository, standardSyncOperation); @@ -293,7 +293,7 @@ private void assertDestinations(ConfigRepository configRepository) throws JsonVa } } - private void runMigration(JobPersistence jobPersistence, Path configRoot) throws IOException { + private void runMigration(JobPersistence jobPersistence, Path configRoot) throws Exception { try (RunMigration runMigration = new RunMigration( INITIAL_VERSION, jobPersistence,