From 89fcf80cb3bd75fdc91c392f62e4111044f800c4 Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 23 Sep 2021 13:26:37 -0700 Subject: [PATCH 1/3] add initialize to YamlSeedConfigPersistence --- .../YamlSeedConfigPersistence.java | 49 +++++++++++-------- .../BaseDatabaseConfigPersistenceTest.java | 26 +++++----- ...DatabaseConfigPersistenceLoadDataTest.java | 11 +++-- .../DatabaseConfigPersistenceTest.java | 48 +++++++++--------- ...istenceUpdateConnectorDefinitionsTest.java | 40 +++++++-------- .../YamlSeedConfigPersistenceTest.java | 11 ++++- .../java/io/airbyte/server/ServerFactory.java | 20 +++++--- .../server/handlers/ArchiveHandlerTest.java | 33 +++++++------ 8 files changed, 132 insertions(+), 106 deletions(-) diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java index 8bbff9c06215..4d451f00ae78 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java @@ -48,36 +48,45 @@ */ public class YamlSeedConfigPersistence implements ConfigPersistence { + public static Class DEFAULT_SEED_DEFINITION_RESOURCE_CLASS = SeedType.class; + private static final Map CONFIG_SCHEMA_MAP = Map.of( ConfigSchema.STANDARD_SOURCE_DEFINITION, SeedType.STANDARD_SOURCE_DEFINITION, ConfigSchema.STANDARD_DESTINATION_DEFINITION, SeedType.STANDARD_DESTINATION_DEFINITION); - private static final YamlSeedConfigPersistence INSTANCE; - static { - try { - INSTANCE = new YamlSeedConfigPersistence(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + private static final Object lock = new Object(); + private static YamlSeedConfigPersistence INSTANCE; // A mapping from seed config type to config UUID to config. private final ImmutableMap> allSeedConfigs; - private YamlSeedConfigPersistence() throws IOException { + private YamlSeedConfigPersistence(final Class seedDefinitionsResourceClass) throws IOException { this.allSeedConfigs = ImmutableMap.>builder() - .put(SeedType.STANDARD_SOURCE_DEFINITION, getConfigs(SeedType.STANDARD_SOURCE_DEFINITION)) - .put(SeedType.STANDARD_DESTINATION_DEFINITION, getConfigs(SeedType.STANDARD_DESTINATION_DEFINITION)) + .put(SeedType.STANDARD_SOURCE_DEFINITION, getConfigs(seedDefinitionsResourceClass, SeedType.STANDARD_SOURCE_DEFINITION)) + .put(SeedType.STANDARD_DESTINATION_DEFINITION, getConfigs(seedDefinitionsResourceClass, SeedType.STANDARD_DESTINATION_DEFINITION)) .build(); } + public static void initialize(final Class seedDefinitionsResourceClass) { + try { + INSTANCE = new YamlSeedConfigPersistence(seedDefinitionsResourceClass); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + public static YamlSeedConfigPersistence get() { - return INSTANCE; + synchronized (lock) { + if (INSTANCE == null) { + throw new IllegalStateException("YamlSeedConfigPersistence has not been initialized"); + } + return INSTANCE; + } } @SuppressWarnings("UnstableApiUsage") - private static Map getConfigs(SeedType seedType) throws IOException { - final URL url = Resources.getResource(SeedType.class, seedType.getResourcePath()); + private static Map getConfigs(final Class seedDefinitionsResourceClass, final SeedType seedType) throws IOException { + final URL url = Resources.getResource(seedDefinitionsResourceClass, seedType.getResourcePath()); final String yamlString = Resources.toString(url, StandardCharsets.UTF_8); final JsonNode configList = Yamls.deserialize(yamlString); return MoreIterators.toList(configList.elements()).stream().collect(Collectors.toMap( @@ -86,7 +95,7 @@ private static Map getConfigs(SeedType seedType) throws IOExce } @Override - public T getConfig(AirbyteConfig configType, String configId, Class clazz) + public T getConfig(final AirbyteConfig configType, final String configId, final Class clazz) throws ConfigNotFoundException, JsonValidationException, IOException { final Map configs = allSeedConfigs.get(CONFIG_SCHEMA_MAP.get(configType)); if (configs == null) { @@ -100,7 +109,7 @@ public T getConfig(AirbyteConfig configType, String configId, Class clazz } @Override - public List listConfigs(AirbyteConfig configType, Class clazz) { + public List listConfigs(final AirbyteConfig configType, final Class clazz) { final Map configs = allSeedConfigs.get(CONFIG_SCHEMA_MAP.get(configType)); if (configs == null) { throw new UnsupportedOperationException("There is no seed for " + configType.name()); @@ -109,17 +118,17 @@ public List listConfigs(AirbyteConfig configType, Class clazz) { } @Override - public void writeConfig(AirbyteConfig configType, String configId, T config) { + public void writeConfig(final AirbyteConfig configType, final String configId, final T config) { throw new UnsupportedOperationException("The seed config persistence is read only."); } @Override - public void deleteConfig(AirbyteConfig configType, String configId) { + public void deleteConfig(final AirbyteConfig configType, final String configId) { throw new UnsupportedOperationException("The seed config persistence is read only."); } @Override - public void replaceAllConfigs(Map> configs, boolean dryRun) { + public void replaceAllConfigs(final Map> configs, final boolean dryRun) { throw new UnsupportedOperationException("The seed config persistence is read only."); } @@ -131,7 +140,7 @@ public Map> dumpConfigs() { } @Override - public void loadData(ConfigPersistence seedPersistence) throws IOException { + public void loadData(final ConfigPersistence seedPersistence) throws IOException { throw new UnsupportedOperationException("The seed config persistence is read only."); } diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java index a8b09e6657e3..34b50582c7de 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java @@ -56,6 +56,7 @@ public abstract class BaseDatabaseConfigPersistenceTest { @BeforeAll public static void dbSetup() { + YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); container = new PostgreSQLContainer<>("postgres:13-alpine") .withDatabaseName("airbyte") .withUsername("docker") @@ -75,7 +76,8 @@ public static void dbDown() { static { try { - ConfigPersistence seedPersistence = YamlSeedConfigPersistence.get(); + YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); + final ConfigPersistence seedPersistence = YamlSeedConfigPersistence.get(); SOURCE_GITHUB = seedPersistence .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, "ef69ef6e-aa7f-4af1-a01d-ef775033524e", StandardSourceDefinition.class); SOURCE_POSTGRES = seedPersistence @@ -84,24 +86,26 @@ public static void dbDown() { .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "424892c4-daac-4491-b35d-c6688ba547ba", StandardDestinationDefinition.class); DESTINATION_S3 = seedPersistence .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "4816b78f-1489-44c1-9060-4b19d5fa9362", StandardDestinationDefinition.class); - } catch (Exception e) { + } catch (final Exception e) { throw new RuntimeException(e); } } - protected static void writeSource(ConfigPersistence configPersistence, StandardSourceDefinition source) throws Exception { + protected static void writeSource(final ConfigPersistence configPersistence, final StandardSourceDefinition source) throws Exception { configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source); } - protected static void writeDestination(ConfigPersistence configPersistence, StandardDestinationDefinition destination) throws Exception { + protected static void writeDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination) + throws Exception { configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination); } - protected static void deleteDestination(ConfigPersistence configPersistence, StandardDestinationDefinition destination) throws Exception { + protected static void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination) + throws Exception { configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString()); } - protected Map> getMapWithSet(Map> input) { + protected Map> getMapWithSet(final Map> input) { return input.entrySet().stream().collect(Collectors.toMap( Entry::getKey, e -> e.getValue().collect(Collectors.toSet()))); @@ -109,22 +113,22 @@ protected Map> getMapWithSet(Map> // assertEquals cannot correctly check the equality of two maps with stream values, // so streams are converted to sets before being compared. - protected void assertSameConfigDump(Map> expected, Map> actual) { + protected void assertSameConfigDump(final Map> expected, final Map> actual) { assertEquals(getMapWithSet(expected), getMapWithSet(actual)); } - protected void assertRecordCount(int expectedCount) throws Exception { - Result> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table("airbyte_configs")).fetch()); + protected void assertRecordCount(final int expectedCount) throws Exception { + final Result> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table("airbyte_configs")).fetch()); assertEquals(expectedCount, recordCount.get(0).value1()); } - protected void assertHasSource(StandardSourceDefinition source) throws Exception { + protected void assertHasSource(final StandardSourceDefinition source) throws Exception { assertEquals(source, configPersistence .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), StandardSourceDefinition.class)); } - protected void assertHasDestination(StandardDestinationDefinition destination) throws Exception { + protected void assertHasDestination(final StandardDestinationDefinition destination) throws Exception { assertEquals(destination, configPersistence .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), StandardDestinationDefinition.class)); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java index acbd68d04cb2..b47732fdad0b 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java @@ -61,6 +61,7 @@ public class DatabaseConfigPersistenceLoadDataTest extends BaseDatabaseConfigPer @BeforeAll public static void setup() throws Exception { + YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); configPersistence = spy(new DatabaseConfigPersistence(database)); database.query(ctx -> ctx.execute("TRUNCATE TABLE airbyte_configs")); @@ -100,23 +101,23 @@ public void testUpdateConfigsInNonEmptyDatabase() throws Exception { @DisplayName("When a connector is in use, its definition should not be updated") public void testNoUpdateForUsedConnector() throws Exception { // the seed has a newer version of s3 destination and github source - StandardDestinationDefinition destinationS3V2 = YamlSeedConfigPersistence.get() + final StandardDestinationDefinition destinationS3V2 = YamlSeedConfigPersistence.get() .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "4816b78f-1489-44c1-9060-4b19d5fa9362", StandardDestinationDefinition.class) .withDockerImageTag("10000.1.0"); when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) .thenReturn(Collections.singletonList(destinationS3V2)); - StandardSourceDefinition sourceGithubV2 = YamlSeedConfigPersistence.get() + final StandardSourceDefinition sourceGithubV2 = YamlSeedConfigPersistence.get() .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, "ef69ef6e-aa7f-4af1-a01d-ef775033524e", StandardSourceDefinition.class) .withDockerImageTag("10000.15.3"); when(seedPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) .thenReturn(Collections.singletonList(sourceGithubV2)); // create connections to mark the source and destination as in use - DestinationConnection s3Connection = new DestinationConnection() + final DestinationConnection s3Connection = new DestinationConnection() .withDestinationId(UUID.randomUUID()) .withDestinationDefinitionId(destinationS3V2.getDestinationDefinitionId()); configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, s3Connection.getDestinationId().toString(), s3Connection); - SourceConnection githubConnection = new SourceConnection() + final SourceConnection githubConnection = new SourceConnection() .withSourceId(UUID.randomUUID()) .withSourceDefinitionId(sourceGithubV2.getSourceDefinitionId()); configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, githubConnection.getSourceId().toString(), githubConnection); @@ -132,7 +133,7 @@ public void testNoUpdateForUsedConnector() throws Exception { @DisplayName("When a connector is not in use, its definition should be updated") public void testUpdateForUnusedConnector() throws Exception { // the seed has a newer version of snowflake destination - StandardDestinationDefinition snowflakeV2 = YamlSeedConfigPersistence.get() + final StandardDestinationDefinition snowflakeV2 = YamlSeedConfigPersistence.get() .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "424892c4-daac-4491-b35d-c6688ba547ba", StandardDestinationDefinition.class) .withDockerImageTag("10000.2.0"); when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java index 91b61ef2a5e6..05a33fdace64 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java @@ -122,8 +122,8 @@ public void testDumpConfigs() throws Exception { writeSource(configPersistence, SOURCE_GITHUB); writeSource(configPersistence, SOURCE_POSTGRES); writeDestination(configPersistence, DESTINATION_S3); - Map> actual = configPersistence.dumpConfigs(); - Map> expected = Map.of( + final Map> actual = configPersistence.dumpConfigs(); + final Map> expected = Map.of( ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), Stream.of(Jsons.jsonNode(SOURCE_GITHUB), Jsons.jsonNode(SOURCE_POSTGRES)), ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(), Stream.of(Jsons.jsonNode(DESTINATION_S3))); assertSameConfigDump(expected, actual); @@ -131,20 +131,20 @@ public void testDumpConfigs() throws Exception { @Test public void testGetConnectorRepositoryToInfoMap() throws Exception { - String connectorRepository = "airbyte/duplicated-connector"; - String oldVersion = "0.1.10"; - String newVersion = "0.2.0"; - StandardSourceDefinition source1 = new StandardSourceDefinition() + final String connectorRepository = "airbyte/duplicated-connector"; + final String oldVersion = "0.1.10"; + final String newVersion = "0.2.0"; + final StandardSourceDefinition source1 = new StandardSourceDefinition() .withSourceDefinitionId(UUID.randomUUID()) .withDockerRepository(connectorRepository) .withDockerImageTag(oldVersion); - StandardSourceDefinition source2 = new StandardSourceDefinition() + final StandardSourceDefinition source2 = new StandardSourceDefinition() .withSourceDefinitionId(UUID.randomUUID()) .withDockerRepository(connectorRepository) .withDockerImageTag(newVersion); writeSource(configPersistence, source1); writeSource(configPersistence, source2); - Map result = database.query(ctx -> configPersistence.getConnectorRepositoryToInfoMap(ctx)); + final Map result = database.query(ctx -> configPersistence.getConnectorRepositoryToInfoMap(ctx)); // when there are duplicated connector definitions, the one with the latest version should be // retrieved assertEquals(newVersion, result.get(connectorRepository).dockerImageTag); @@ -152,12 +152,12 @@ public void testGetConnectorRepositoryToInfoMap() throws Exception { @Test public void testInsertConfigRecord() throws Exception { - OffsetDateTime timestamp = OffsetDateTime.now(); - UUID definitionId = UUID.randomUUID(); - String connectorRepository = "airbyte/test-connector"; + final OffsetDateTime timestamp = OffsetDateTime.now(); + final UUID definitionId = UUID.randomUUID(); + final String connectorRepository = "airbyte/test-connector"; // when the record does not exist, it is inserted - StandardSourceDefinition source1 = new StandardSourceDefinition() + final StandardSourceDefinition source1 = new StandardSourceDefinition() .withSourceDefinitionId(definitionId) .withDockerRepository(connectorRepository) .withDockerImageTag("0.1.2"); @@ -175,7 +175,7 @@ public void testInsertConfigRecord() throws Exception { assertHasSource(SOURCE_GITHUB); // when the record already exists, it is ignored - StandardSourceDefinition source2 = new StandardSourceDefinition() + final StandardSourceDefinition source2 = new StandardSourceDefinition() .withSourceDefinitionId(definitionId) .withDockerRepository(connectorRepository) .withDockerImageTag("0.1.5"); @@ -193,11 +193,11 @@ public void testInsertConfigRecord() throws Exception { @Test public void testUpdateConfigRecord() throws Exception { - OffsetDateTime timestamp = OffsetDateTime.now(); - UUID definitionId = UUID.randomUUID(); - String connectorRepository = "airbyte/test-connector"; + final OffsetDateTime timestamp = OffsetDateTime.now(); + final UUID definitionId = UUID.randomUUID(); + final String connectorRepository = "airbyte/test-connector"; - StandardSourceDefinition oldSource = new StandardSourceDefinition() + final StandardSourceDefinition oldSource = new StandardSourceDefinition() .withSourceDefinitionId(definitionId) .withDockerRepository(connectorRepository) .withDockerImageTag("0.3.5"); @@ -208,7 +208,7 @@ public void testUpdateConfigRecord() throws Exception { assertHasSource(oldSource); assertHasSource(SOURCE_GITHUB); - StandardSourceDefinition newSource = new StandardSourceDefinition() + final StandardSourceDefinition newSource = new StandardSourceDefinition() .withSourceDefinitionId(definitionId) .withDockerRepository(connectorRepository) .withDockerImageTag("0.3.5"); @@ -231,8 +231,8 @@ public void testHasNewVersion() { @Test public void testGetNewFields() { - JsonNode o1 = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }"); - JsonNode o2 = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3 }"); + final JsonNode o1 = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }"); + final JsonNode o2 = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3 }"); assertEquals(Collections.emptySet(), DatabaseConfigPersistence.getNewFields(o1, o1)); assertEquals(Collections.singleton("field3"), DatabaseConfigPersistence.getNewFields(o1, o2)); assertEquals(Collections.singleton("field2"), DatabaseConfigPersistence.getNewFields(o2, o1)); @@ -240,13 +240,13 @@ public void testGetNewFields() { @Test public void testGetDefinitionWithNewFields() { - JsonNode current = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }"); - JsonNode latest = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3, \"field4\": 4 }"); - Set newFields = Set.of("field3"); + final JsonNode current = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }"); + final JsonNode latest = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3, \"field4\": 4 }"); + final Set newFields = Set.of("field3"); assertEquals(current, DatabaseConfigPersistence.getDefinitionWithNewFields(current, latest, Collections.emptySet())); - JsonNode currentWithNewFields = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2, \"field3\": 3 }"); + final JsonNode currentWithNewFields = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2, \"field3\": 3 }"); assertEquals(currentWithNewFields, DatabaseConfigPersistence.getDefinitionWithNewFields(current, latest, newFields)); } diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java index bbf7752c8c8e..737f13f2b122 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java @@ -83,8 +83,8 @@ public void testNewConnector() throws Exception { @Test @DisplayName("When an old connector is in use, if it has all fields, do not update it") public void testOldConnectorInUseWithAllFields() throws Exception { - StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.0.0"); - StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.1000.0"); + final StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.0.0"); + final StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.1000.0"); assertUpdateConnectorDefinition( Collections.singletonList(currentSource), @@ -96,9 +96,9 @@ public void testOldConnectorInUseWithAllFields() throws Exception { @Test @DisplayName("When a old connector is in use, add missing fields, do not update its version") public void testOldConnectorInUseWithMissingFields() throws Exception { - StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.0.0").withDocumentationUrl(null).withSourceType(null); - StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.1000.0"); - StandardSourceDefinition currentSourceWithNewFields = getSource().withDockerImageTag("0.0.0"); + final StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.0.0").withDocumentationUrl(null).withSourceType(null); + final StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.1000.0"); + final StandardSourceDefinition currentSourceWithNewFields = getSource().withDockerImageTag("0.0.0"); assertUpdateConnectorDefinition( Collections.singletonList(currentSource), @@ -110,8 +110,8 @@ public void testOldConnectorInUseWithMissingFields() throws Exception { @Test @DisplayName("When an unused connector has a new version, update it") public void testUnusedConnectorWithOldVersion() throws Exception { - StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.0.0"); - StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.1000.0"); + final StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.0.0"); + final StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.1000.0"); assertUpdateConnectorDefinition( Collections.singletonList(currentSource), @@ -123,9 +123,9 @@ public void testUnusedConnectorWithOldVersion() throws Exception { @Test @DisplayName("When an unused connector has missing fields, add the missing fields, do not update its version") public void testUnusedConnectorWithMissingFields() throws Exception { - StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.1000.0").withDocumentationUrl(null).withSourceType(null); - StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.99.0"); - StandardSourceDefinition currentSourceWithNewFields = getSource().withDockerImageTag("0.1000.0"); + final StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.1000.0").withDocumentationUrl(null).withSourceType(null); + final StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.99.0"); + final StandardSourceDefinition currentSourceWithNewFields = getSource().withDockerImageTag("0.1000.0"); assertUpdateConnectorDefinition( Collections.singletonList(currentSource), @@ -145,23 +145,23 @@ private StandardSourceDefinition getSource() { * @param currentSources all sources currently exist in the database * @param currentSourcesInUse a subset of currentSources; sources currently used in data syncing */ - private void assertUpdateConnectorDefinition(List currentSources, - List currentSourcesInUse, - List latestSources, - List expectedUpdatedSources) + private void assertUpdateConnectorDefinition(final List currentSources, + final List currentSourcesInUse, + final List latestSources, + final List expectedUpdatedSources) throws Exception { - for (StandardSourceDefinition source : currentSources) { + for (final StandardSourceDefinition source : currentSources) { writeSource(configPersistence, source); } - for (StandardSourceDefinition source : currentSourcesInUse) { + for (final StandardSourceDefinition source : currentSourcesInUse) { assertTrue(currentSources.contains(source), "currentSourcesInUse must exist in currentSources"); } - Set sourceRepositoriesInUse = currentSourcesInUse.stream() + final Set sourceRepositoriesInUse = currentSourcesInUse.stream() .map(StandardSourceDefinition::getDockerRepository) .collect(Collectors.toSet()); - Map currentSourceRepositoryToInfo = currentSources.stream() + final Map currentSourceRepositoryToInfo = currentSources.stream() .collect(Collectors.toMap( StandardSourceDefinition::getDockerRepository, s -> new ConnectorInfo(s.getSourceDefinitionId().toString(), Jsons.jsonNode(s)))); @@ -175,14 +175,14 @@ private void assertUpdateConnectorDefinition(List curr latestSources, sourceRepositoriesInUse, currentSourceRepositoryToInfo); - } catch (IOException e) { + } catch (final IOException e) { throw new SQLException(e); } return null; }); assertRecordCount(expectedUpdatedSources.size()); - for (StandardSourceDefinition source : expectedUpdatedSources) { + for (final StandardSourceDefinition source : expectedUpdatedSources) { assertHasSource(source); } } diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java index 87fe31181840..cc16ad3be22c 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java @@ -37,11 +37,18 @@ import java.util.Collections; import java.util.Map; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; public class YamlSeedConfigPersistenceTest { - private static final YamlSeedConfigPersistence PERSISTENCE = YamlSeedConfigPersistence.get(); + private static YamlSeedConfigPersistence PERSISTENCE; + + @BeforeAll + static void setup() { + YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); + PERSISTENCE = YamlSeedConfigPersistence.get(); + } @Test public void testGetConfig() throws Exception { @@ -75,7 +82,7 @@ public void testGetInvalidConfig() { @Test public void testDumpConfigs() { - Map> allSeedConfigs = PERSISTENCE.dumpConfigs(); + final Map> allSeedConfigs = PERSISTENCE.dumpConfigs(); assertEquals(2, allSeedConfigs.size()); assertTrue(allSeedConfigs.get(ConfigSchema.STANDARD_SOURCE_DEFINITION.name()).findAny().isPresent()); assertTrue(allSeedConfigs.get(ConfigSchema.STANDARD_DESTINATION_DEFINITION.name()).findAny().isPresent()); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java index bb8285855e04..9dce2e920834 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java @@ -26,7 +26,9 @@ import io.airbyte.commons.io.FileTtlManager; import io.airbyte.config.Configs; +import io.airbyte.config.init.SeedType; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.db.Database; import io.airbyte.scheduler.client.SchedulerJobClient; import io.airbyte.scheduler.client.SpecCachingSynchronousSchedulerClient; @@ -51,14 +53,16 @@ ServerRunnable create(SchedulerJobClient schedulerJobClient, class Api implements ServerFactory { @Override - public ServerRunnable create(SchedulerJobClient schedulerJobClient, - SpecCachingSynchronousSchedulerClient cachingSchedulerClient, - WorkflowServiceStubs temporalService, - ConfigRepository configRepository, - JobPersistence jobPersistence, - Database configsDatabase, - Database jobsDatabase, - Configs configs) { + public ServerRunnable create(final SchedulerJobClient schedulerJobClient, + final SpecCachingSynchronousSchedulerClient cachingSchedulerClient, + final WorkflowServiceStubs temporalService, + final ConfigRepository configRepository, + final JobPersistence jobPersistence, + final Database configsDatabase, + final Database jobsDatabase, + final Configs configs) { + YamlSeedConfigPersistence.initialize(SeedType.class); + // set static values for factory ConfigurationApiFactory.setSchedulerJobClient(schedulerJobClient); ConfigurationApiFactory.setSynchronousSchedulerClient(cachingSchedulerClient); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java index 500c369dbe78..ce8184eab9d5 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java @@ -100,12 +100,13 @@ public NoOpFileTtlManager() { super(1L, TimeUnit.MINUTES, 1L); } - public void register(Path path) {} + public void register(final Path path) {} } @BeforeAll public static void dbSetup() { + YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); container = new PostgreSQLContainer<>("postgres:13-alpine") .withDatabaseName("airbyte") .withUsername("docker") @@ -166,21 +167,21 @@ void testFullExportImportRoundTrip() throws Exception { // After importing the configs, the dump is restored. assertTrue(archive.exists()); - ImportRead importResult = archiveHandler.importData(archive); + final ImportRead importResult = archiveHandler.importData(archive); assertFalse(archive.exists()); assertEquals(StatusEnum.SUCCEEDED, importResult.getStatus()); assertSameConfigDump(seedPersistence.dumpConfigs(), configRepository.dumpConfigs()); // When a connector definition is in use, it will not be updated. - UUID sourceS3DefinitionId = UUID.fromString("69589781-7828-43c5-9f63-8925b1c1ccc2"); - String sourceS3DefinitionVersion = "0.0.0"; - StandardSourceDefinition sourceS3Definition = seedPersistence.getConfig( + final UUID sourceS3DefinitionId = UUID.fromString("69589781-7828-43c5-9f63-8925b1c1ccc2"); + final String sourceS3DefinitionVersion = "0.0.0"; + final StandardSourceDefinition sourceS3Definition = seedPersistence.getConfig( ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceS3DefinitionId.toString(), StandardSourceDefinition.class) // This source definition is on an old version .withDockerImageTag(sourceS3DefinitionVersion); - SourceConnection sourceConnection = new SourceConnection() + final SourceConnection sourceConnection = new SourceConnection() .withSourceDefinitionId(sourceS3DefinitionId) .withSourceId(UUID.randomUUID()) .withWorkspaceId(UUID.randomUUID()) @@ -198,7 +199,7 @@ void testFullExportImportRoundTrip() throws Exception { archiveHandler.importData(archive); // The version has not changed. - StandardSourceDefinition actualS3Definition = configPersistence.getConfig( + final StandardSourceDefinition actualS3Definition = configPersistence.getConfig( ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceS3DefinitionId.toString(), StandardSourceDefinition.class); @@ -219,7 +220,7 @@ void testLightWeightExportImportRoundTrip() throws Exception { // Export the first workspace configs File archive = archiveHandler.exportWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId)); - File secondArchive = Files.createTempFile("tests", "archive").toFile(); + final File secondArchive = Files.createTempFile("tests", "archive").toFile(); FileUtils.copyFile(archive, secondArchive); // After deleting all the configs, the dump becomes empty. @@ -251,9 +252,9 @@ void testLightWeightExportImportRoundTrip() throws Exception { setupWorkspaceData(secondWorkspaceId); // the archive is importing again in another workspace - UploadRead secondUploadRead = archiveHandler.uploadArchiveResource(secondArchive); + final UploadRead secondUploadRead = archiveHandler.uploadArchiveResource(secondArchive); assertEquals(UploadRead.StatusEnum.SUCCEEDED, secondUploadRead.getStatus()); - ImportRead secondImportResult = archiveHandler.importIntoWorkspace(new ImportRequestBody() + final ImportRead secondImportResult = archiveHandler.importIntoWorkspace(new ImportRequestBody() .resourceId(secondUploadRead.getResourceId()) .workspaceId(secondWorkspaceId)); assertEquals(StatusEnum.SUCCEEDED, secondImportResult.getStatus()); @@ -272,7 +273,7 @@ void testLightWeightExportImportRoundTrip() throws Exception { .withTombstone(false) .withConfiguration(Jsons.emptyObject()); - ConnectorSpecification emptyConnectorSpec = mock(ConnectorSpecification.class); + final ConnectorSpecification emptyConnectorSpec = mock(ConnectorSpecification.class); when(emptyConnectorSpec.getConnectionSpecification()).thenReturn(Jsons.emptyObject()); configRepository.writeSourceConnection(sourceConnection, emptyConnectorSpec); @@ -292,14 +293,14 @@ void testLightWeightExportImportRoundTrip() throws Exception { assertSameConfigDump(secondWorkspaceDump, configRepository.dumpConfigs()); } - private void setupWorkspaceData(UUID workspaceId) throws IOException { + private void setupWorkspaceData(final UUID workspaceId) throws IOException { configPersistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), new StandardWorkspace() .withWorkspaceId(workspaceId) .withName("test-workspace") .withTombstone(false)); } - private void setupTestData(UUID workspaceId) throws JsonValidationException, IOException { + private void setupTestData(final UUID workspaceId) throws JsonValidationException, IOException { // Fill up with some configurations setupWorkspaceData(workspaceId); final UUID sourceid = UUID.randomUUID(); @@ -320,16 +321,16 @@ private void setupTestData(UUID workspaceId) throws JsonValidationException, IOE .withTombstone(false)); } - private void assertSameConfigDump(Map> expected, Map> actual) { + private void assertSameConfigDump(final Map> expected, final Map> actual) { assertEquals(expected.keySet(), actual.keySet(), String.format("The expected (%s) vs actual (%s) streams does not match", expected.size(), actual.size())); - for (String stream : expected.keySet()) { + for (final String stream : expected.keySet()) { LOGGER.info("Checking stream {}", stream); // assertEquals cannot correctly check the equality of two maps with stream values, // so streams are converted to sets before being compared. final Set expectedRecords = expected.get(stream).collect(Collectors.toSet()); final Set actualRecords = actual.get(stream).collect(Collectors.toSet()); - for (var expectedRecord : expectedRecords) { + for (final var expectedRecord : expectedRecords) { assertTrue(actualRecords.contains(expectedRecord), String.format("\n Expected record was not found:\n%s\n Actual records were:\n%s\n", expectedRecord, From eda4c04867d1dfb7a3305acc54ded3ce9464f75b Mon Sep 17 00:00:00 2001 From: cgardens Date: Fri, 24 Sep 2021 06:03:14 -0700 Subject: [PATCH 2/3] yaml seed us DI instead of singleton --- .../YamlSeedConfigPersistence.java | 28 +++------- .../BaseDatabaseConfigPersistenceTest.java | 3 +- ...DatabaseConfigPersistenceLoadDataTest.java | 7 ++- .../YamlSeedConfigPersistenceTest.java | 3 +- .../server/ConfigurationApiFactory.java | 45 +++++++--------- .../java/io/airbyte/server/ServerApp.java | 13 +++-- .../java/io/airbyte/server/ServerFactory.java | 28 +++++----- .../airbyte/server/apis/ConfigurationApi.java | 54 +++++++++++-------- .../server/handlers/ArchiveHandler.java | 27 ++++++---- .../server/handlers/ArchiveHandlerTest.java | 4 +- .../server/migration/RunMigrationTest.java | 38 ++++++------- 11 files changed, 122 insertions(+), 128 deletions(-) diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java index 4d451f00ae78..51e2afee93b4 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java @@ -54,12 +54,17 @@ public class YamlSeedConfigPersistence implements ConfigPersistence { ConfigSchema.STANDARD_SOURCE_DEFINITION, SeedType.STANDARD_SOURCE_DEFINITION, ConfigSchema.STANDARD_DESTINATION_DEFINITION, SeedType.STANDARD_DESTINATION_DEFINITION); - private static final Object lock = new Object(); - private static YamlSeedConfigPersistence INSTANCE; - // A mapping from seed config type to config UUID to config. private final ImmutableMap> allSeedConfigs; + public static YamlSeedConfigPersistence getDefault() throws IOException { + return new YamlSeedConfigPersistence(DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); + } + + public static YamlSeedConfigPersistence get(final Class seedDefinitionsResourceClass) throws IOException { + return new YamlSeedConfigPersistence(seedDefinitionsResourceClass); + } + private YamlSeedConfigPersistence(final Class seedDefinitionsResourceClass) throws IOException { this.allSeedConfigs = ImmutableMap.>builder() .put(SeedType.STANDARD_SOURCE_DEFINITION, getConfigs(seedDefinitionsResourceClass, SeedType.STANDARD_SOURCE_DEFINITION)) @@ -67,23 +72,6 @@ private YamlSeedConfigPersistence(final Class seedDefinitionsResourceClass) t .build(); } - public static void initialize(final Class seedDefinitionsResourceClass) { - try { - INSTANCE = new YamlSeedConfigPersistence(seedDefinitionsResourceClass); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - public static YamlSeedConfigPersistence get() { - synchronized (lock) { - if (INSTANCE == null) { - throw new IllegalStateException("YamlSeedConfigPersistence has not been initialized"); - } - return INSTANCE; - } - } - @SuppressWarnings("UnstableApiUsage") private static Map getConfigs(final Class seedDefinitionsResourceClass, final SeedType seedType) throws IOException { final URL url = Resources.getResource(seedDefinitionsResourceClass, seedType.getResourcePath()); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java index 34b50582c7de..3acc2075db56 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java @@ -76,8 +76,7 @@ public static void dbDown() { static { try { - YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); - final ConfigPersistence seedPersistence = YamlSeedConfigPersistence.get(); + final ConfigPersistence seedPersistence = YamlSeedConfigPersistence.getDefault(); SOURCE_GITHUB = seedPersistence .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, "ef69ef6e-aa7f-4af1-a01d-ef775033524e", StandardSourceDefinition.class); SOURCE_POSTGRES = seedPersistence diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java index b47732fdad0b..f66d08983f13 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java @@ -61,7 +61,6 @@ public class DatabaseConfigPersistenceLoadDataTest extends BaseDatabaseConfigPer @BeforeAll public static void setup() throws Exception { - YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); configPersistence = spy(new DatabaseConfigPersistence(database)); database.query(ctx -> ctx.execute("TRUNCATE TABLE airbyte_configs")); @@ -101,12 +100,12 @@ public void testUpdateConfigsInNonEmptyDatabase() throws Exception { @DisplayName("When a connector is in use, its definition should not be updated") public void testNoUpdateForUsedConnector() throws Exception { // the seed has a newer version of s3 destination and github source - final StandardDestinationDefinition destinationS3V2 = YamlSeedConfigPersistence.get() + final StandardDestinationDefinition destinationS3V2 = YamlSeedConfigPersistence.getDefault() .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "4816b78f-1489-44c1-9060-4b19d5fa9362", StandardDestinationDefinition.class) .withDockerImageTag("10000.1.0"); when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) .thenReturn(Collections.singletonList(destinationS3V2)); - final StandardSourceDefinition sourceGithubV2 = YamlSeedConfigPersistence.get() + final StandardSourceDefinition sourceGithubV2 = YamlSeedConfigPersistence.getDefault() .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, "ef69ef6e-aa7f-4af1-a01d-ef775033524e", StandardSourceDefinition.class) .withDockerImageTag("10000.15.3"); when(seedPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) @@ -133,7 +132,7 @@ public void testNoUpdateForUsedConnector() throws Exception { @DisplayName("When a connector is not in use, its definition should be updated") public void testUpdateForUnusedConnector() throws Exception { // the seed has a newer version of snowflake destination - final StandardDestinationDefinition snowflakeV2 = YamlSeedConfigPersistence.get() + final StandardDestinationDefinition snowflakeV2 = YamlSeedConfigPersistence.getDefault() .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "424892c4-daac-4491-b35d-c6688ba547ba", StandardDestinationDefinition.class) .withDockerImageTag("10000.2.0"); when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java index cc16ad3be22c..667061ddd97c 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java @@ -46,8 +46,7 @@ public class YamlSeedConfigPersistenceTest { @BeforeAll static void setup() { - YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); - PERSISTENCE = YamlSeedConfigPersistence.get(); + PERSISTENCE = YamlSeedConfigPersistence.getDefault(); } @Test diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java index afafb7d3a247..8180102eeb23 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java @@ -26,6 +26,7 @@ import io.airbyte.commons.io.FileTtlManager; import io.airbyte.config.Configs; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient; @@ -42,6 +43,7 @@ public class ConfigurationApiFactory implements Factory { private static WorkflowServiceStubs temporalService; private static ConfigRepository configRepository; private static JobPersistence jobPersistence; + private static ConfigPersistence seed; private static SchedulerJobClient schedulerJobClient; private static CachingSynchronousSchedulerClient synchronousSchedulerClient; private static Configs configs; @@ -50,50 +52,39 @@ public class ConfigurationApiFactory implements Factory { private static Database configsDatabase; private static Database jobsDatabase; - public static void setConfigRepository(final ConfigRepository configRepository) { + public static void setValues( + final WorkflowServiceStubs temporalService, + final ConfigRepository configRepository, + final JobPersistence jobPersistence, + final ConfigPersistence seed, + final SchedulerJobClient schedulerJobClient, + final CachingSynchronousSchedulerClient synchronousSchedulerClient, + final Configs configs, + final FileTtlManager archiveTtlManager, + final Map mdc, + final Database configsDatabase, + final Database jobsDatabase) { ConfigurationApiFactory.configRepository = configRepository; - } - - public static void setJobPersistence(final JobPersistence jobPersistence) { ConfigurationApiFactory.jobPersistence = jobPersistence; - } - - public static void setSchedulerJobClient(final SchedulerJobClient schedulerJobClient) { + ConfigurationApiFactory.seed = seed; ConfigurationApiFactory.schedulerJobClient = schedulerJobClient; - } - - public static void setSynchronousSchedulerClient(final CachingSynchronousSchedulerClient synchronousSchedulerClient) { ConfigurationApiFactory.synchronousSchedulerClient = synchronousSchedulerClient; - } - - public static void setConfigs(Configs configs) { ConfigurationApiFactory.configs = configs; - } - - public static void setArchiveTtlManager(final FileTtlManager archiveTtlManager) { ConfigurationApiFactory.archiveTtlManager = archiveTtlManager; - } - - public static void setMdc(Map mdc) { ConfigurationApiFactory.mdc = mdc; - } - - public static void setTemporalService(final WorkflowServiceStubs temporalService) { ConfigurationApiFactory.temporalService = temporalService; - } - - public static void setDatabases(final Database configsDatabase, final Database jobsDatabase) { ConfigurationApiFactory.configsDatabase = configsDatabase; ConfigurationApiFactory.jobsDatabase = jobsDatabase; } @Override public ConfigurationApi provide() { - MDC.setContextMap(mdc); + MDC.setContextMap(ConfigurationApiFactory.mdc); return new ConfigurationApi( ConfigurationApiFactory.configRepository, ConfigurationApiFactory.jobPersistence, + ConfigurationApiFactory.seed, ConfigurationApiFactory.schedulerJobClient, ConfigurationApiFactory.synchronousSchedulerClient, ConfigurationApiFactory.configs, @@ -104,7 +95,7 @@ public ConfigurationApi provide() { } @Override - public void dispose(ConfigurationApi service) { + public void dispose(final ConfigurationApi service) { /* noop */ } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 516b6bbcf757..8f2129f7d072 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -33,6 +33,7 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.helpers.LogClientSingleton; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.YamlSeedConfigPersistence; @@ -169,7 +170,7 @@ private static void createWorkspaceIfNoneExists(final ConfigRepository configRep TrackingClientSingleton.get().identify(workspaceId); } - public static ServerRunnable getServer(final ServerFactory apiFactory) throws Exception { + public static ServerRunnable getServer(final ServerFactory apiFactory, final ConfigPersistence seed) throws Exception { final Configs configs = new EnvConfigs(); LogClientSingleton.setWorkspaceMdc(LogClientSingleton.getServerLogsRoot(configs)); @@ -230,7 +231,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex final boolean versionSupportsAutoMigrate = new AirbyteVersion(airbyteDatabaseVersion.get()).patchVersionCompareTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION) >= 0; if (!isKubernetes || versionSupportsAutoMigrate) { - runAutomaticMigration(configRepository, jobPersistence, specFetcher, airbyteVersion, airbyteDatabaseVersion.get()); + runAutomaticMigration(configRepository, jobPersistence, seed, specFetcher, airbyteVersion, airbyteDatabaseVersion.get()); // After migration, upgrade the DB version airbyteDatabaseVersion = jobPersistence.getVersion(); } else { @@ -242,7 +243,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex LOGGER.info("Starting server..."); runFlywayMigration(configs, configDatabase, jobDatabase); - configPersistence.loadData(YamlSeedConfigPersistence.get()); + configPersistence.loadData(seed); return apiFactory.create( schedulerJobClient, @@ -250,6 +251,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex temporalService, configRepository, jobPersistence, + seed, configDatabase, jobDatabase, configs); @@ -260,7 +262,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex } public static void main(final String[] args) throws Exception { - getServer(new ServerFactory.Api()).start(); + getServer(new ServerFactory.Api(), YamlSeedConfigPersistence.getDefault()).start(); } /** @@ -269,6 +271,7 @@ public static void main(final String[] args) throws Exception { */ private static void runAutomaticMigration(final ConfigRepository configRepository, final JobPersistence jobPersistence, + final ConfigPersistence seed, final SpecFetcher specFetcher, final String airbyteVersion, final String airbyteDatabaseVersion) { @@ -277,7 +280,7 @@ private static void runAutomaticMigration(final ConfigRepository configRepositor jobPersistence, configRepository, airbyteVersion, - YamlSeedConfigPersistence.get(), + seed, specFetcher)) { runMigration.run(); } catch (final Exception e) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java index 9dce2e920834..bea7bd01a887 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java @@ -26,9 +26,8 @@ import io.airbyte.commons.io.FileTtlManager; import io.airbyte.config.Configs; -import io.airbyte.config.init.SeedType; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.db.Database; import io.airbyte.scheduler.client.SchedulerJobClient; import io.airbyte.scheduler.client.SpecCachingSynchronousSchedulerClient; @@ -46,6 +45,8 @@ ServerRunnable create(SchedulerJobClient schedulerJobClient, WorkflowServiceStubs temporalService, ConfigRepository configRepository, JobPersistence jobPersistence, + ConfigPersistence seed, + Database configsDatabase, Database jobsDatabase, Configs configs); @@ -58,21 +59,22 @@ public ServerRunnable create(final SchedulerJobClient schedulerJobClient, final WorkflowServiceStubs temporalService, final ConfigRepository configRepository, final JobPersistence jobPersistence, + final ConfigPersistence seed, final Database configsDatabase, final Database jobsDatabase, final Configs configs) { - YamlSeedConfigPersistence.initialize(SeedType.class); - // set static values for factory - ConfigurationApiFactory.setSchedulerJobClient(schedulerJobClient); - ConfigurationApiFactory.setSynchronousSchedulerClient(cachingSchedulerClient); - ConfigurationApiFactory.setTemporalService(temporalService); - ConfigurationApiFactory.setConfigRepository(configRepository); - ConfigurationApiFactory.setJobPersistence(jobPersistence); - ConfigurationApiFactory.setConfigs(configs); - ConfigurationApiFactory.setArchiveTtlManager(new FileTtlManager(10, TimeUnit.MINUTES, 10)); - ConfigurationApiFactory.setMdc(MDC.getCopyOfContextMap()); - ConfigurationApiFactory.setDatabases(configsDatabase, jobsDatabase); + ConfigurationApiFactory.setValues( + temporalService, + configRepository, + jobPersistence, + seed, + schedulerJobClient, + cachingSchedulerClient, + configs, + new FileTtlManager(10, TimeUnit.MINUTES, 10), + MDC.getCopyOfContextMap(), + configsDatabase, jobsDatabase); // server configurations final Set> componentClasses = Set.of(ConfigurationApi.class); diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 295b7a59e520..b132b52506e3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -98,6 +98,7 @@ import io.airbyte.commons.io.FileTtlManager; import io.airbyte.config.Configs; import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient; @@ -159,6 +160,7 @@ public class ConfigurationApi implements io.airbyte.api.V1Api { public ConfigurationApi(final ConfigRepository configRepository, final JobPersistence jobPersistence, + final ConfigPersistence seed, final SchedulerJobClient schedulerJobClient, final CachingSynchronousSchedulerClient synchronousSchedulerClient, final Configs configs, @@ -197,8 +199,14 @@ public ConfigurationApi(final ConfigRepository configRepository, webBackendSourcesHandler = new WebBackendSourcesHandler(sourceHandler, configRepository); webBackendDestinationsHandler = new WebBackendDestinationsHandler(destinationHandler, configRepository); healthCheckHandler = new HealthCheckHandler(configRepository); - archiveHandler = - new ArchiveHandler(configs.getAirbyteVersion(), configRepository, jobPersistence, workspaceHelper, archiveTtlManager, specFetcher); + archiveHandler = new ArchiveHandler( + configs.getAirbyteVersion(), + configRepository, + jobPersistence, + seed, + workspaceHelper, + archiveTtlManager, + specFetcher); logsHandler = new LogsHandler(); openApiConfigHandler = new OpenApiConfigHandler(); dbMigrationHandler = new DbMigrationHandler(configsDatabase, jobsDatabase); @@ -282,27 +290,27 @@ public SourceDefinitionSpecificationRead getSourceDefinitionSpecification(final // OAUTH @Override - public OAuthConsentRead getSourceOAuthConsent(SourceOauthConsentRequest sourceOauthConsentRequest) { + public OAuthConsentRead getSourceOAuthConsent(final SourceOauthConsentRequest sourceOauthConsentRequest) { return execute(() -> oAuthHandler.getSourceOAuthConsent(sourceOauthConsentRequest)); } @Override - public Map completeSourceOAuth(CompleteSourceOauthRequest completeSourceOauthRequest) { + public Map completeSourceOAuth(final CompleteSourceOauthRequest completeSourceOauthRequest) { return execute(() -> oAuthHandler.completeSourceOAuth(completeSourceOauthRequest)); } @Override - public OAuthConsentRead getDestinationOAuthConsent(DestinationOauthConsentRequest destinationOauthConsentRequest) { + public OAuthConsentRead getDestinationOAuthConsent(final DestinationOauthConsentRequest destinationOauthConsentRequest) { return execute(() -> oAuthHandler.getDestinationOAuthConsent(destinationOauthConsentRequest)); } @Override - public Map completeDestinationOAuth(CompleteDestinationOAuthRequest requestBody) { + public Map completeDestinationOAuth(final CompleteDestinationOAuthRequest requestBody) { return execute(() -> oAuthHandler.completeDestinationOAuth(requestBody)); } @Override - public void setInstancewideDestinationOauthParams(SetInstancewideDestinationOauthParamsRequestBody requestBody) { + public void setInstancewideDestinationOauthParams(final SetInstancewideDestinationOauthParamsRequestBody requestBody) { execute(() -> { oAuthHandler.setDestinationInstancewideOauthParams(requestBody); return null; @@ -310,7 +318,7 @@ public void setInstancewideDestinationOauthParams(SetInstancewideDestinationOaut } @Override - public void setInstancewideSourceOauthParams(SetInstancewideSourceOauthParamsRequestBody requestBody) { + public void setInstancewideSourceOauthParams(final SetInstancewideSourceOauthParamsRequestBody requestBody) { execute(() -> { oAuthHandler.setSourceInstancewideOauthParams(requestBody); return null; @@ -365,12 +373,12 @@ public SourceDiscoverSchemaRead discoverSchemaForSource(final SourceIdRequestBod // DB MIGRATION @Override - public DbMigrationReadList listMigrations(DbMigrationRequestBody request) { + public DbMigrationReadList listMigrations(final DbMigrationRequestBody request) { return execute(() -> dbMigrationHandler.list(request)); } @Override - public DbMigrationExecutionRead executeMigrations(DbMigrationRequestBody request) { + public DbMigrationExecutionRead executeMigrations(final DbMigrationRequestBody request) { return execute(() -> dbMigrationHandler.migrate(request)); } @@ -492,7 +500,7 @@ public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdReq // Operations @Override - public CheckOperationRead checkOperation(OperatorConfiguration operatorConfiguration) { + public CheckOperationRead checkOperation(final OperatorConfiguration operatorConfiguration) { return execute(() -> operationsHandler.checkOperation(operatorConfiguration)); } @@ -502,7 +510,7 @@ public OperationRead createOperation(final OperationCreate operationCreate) { } @Override - public void deleteOperation(OperationIdRequestBody operationIdRequestBody) { + public void deleteOperation(final OperationIdRequestBody operationIdRequestBody) { execute(() -> { operationsHandler.deleteOperation(operationIdRequestBody); return null; @@ -510,17 +518,17 @@ public void deleteOperation(OperationIdRequestBody operationIdRequestBody) { } @Override - public OperationReadList listOperationsForConnection(ConnectionIdRequestBody connectionIdRequestBody) { + public OperationReadList listOperationsForConnection(final ConnectionIdRequestBody connectionIdRequestBody) { return execute(() -> operationsHandler.listOperationsForConnection(connectionIdRequestBody)); } @Override - public OperationRead getOperation(OperationIdRequestBody operationIdRequestBody) { + public OperationRead getOperation(final OperationIdRequestBody operationIdRequestBody) { return execute(() -> operationsHandler.getOperation(operationIdRequestBody)); } @Override - public OperationRead updateOperation(OperationUpdate operationUpdate) { + public OperationRead updateOperation(final OperationUpdate operationUpdate) { return execute(() -> operationsHandler.updateOperation(operationUpdate)); } @@ -591,7 +599,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti } @Override - public WebBackendConnectionRead webBackendCreateConnection(WebBackendConnectionCreate webBackendConnectionCreate) { + public WebBackendConnectionRead webBackendCreateConnection(final WebBackendConnectionCreate webBackendConnectionCreate) { return execute(() -> webBackendConnectionsHandler.webBackendCreateConnection(webBackendConnectionCreate)); } @@ -623,30 +631,30 @@ public ImportRead importArchive(final File archiveFile) { } @Override - public File exportWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) { + public File exportWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) { return execute(() -> archiveHandler.exportWorkspace(workspaceIdRequestBody)); } @Override - public UploadRead uploadArchiveResource(File archiveFile) { + public UploadRead uploadArchiveResource(final File archiveFile) { return execute(() -> archiveHandler.uploadArchiveResource(archiveFile)); } @Override - public ImportRead importIntoWorkspace(ImportRequestBody importRequestBody) { + public ImportRead importIntoWorkspace(final ImportRequestBody importRequestBody) { return execute(() -> archiveHandler.importIntoWorkspace(importRequestBody)); } - private T execute(HandlerCall call) { + private T execute(final HandlerCall call) { try { return call.call(); - } catch (ConfigNotFoundException e) { + } catch (final ConfigNotFoundException e) { throw new IdNotFoundKnownException(String.format("Could not find configuration for %s: %s.", e.getType().toString(), e.getConfigId()), e.getConfigId(), e); - } catch (JsonValidationException e) { + } catch (final JsonValidationException e) { throw new BadObjectSchemaKnownException( String.format("The provided configuration does not fulfill the specification. Errors: %s", e.getMessage()), e); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java index 0030496ffc93..c2b1e6ec7414 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java @@ -31,8 +31,8 @@ import io.airbyte.api.model.WorkspaceIdRequestBody; import io.airbyte.commons.io.FileTtlManager; import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.WorkspaceHelper; import io.airbyte.server.ConfigDumpExporter; @@ -53,11 +53,13 @@ public class ArchiveHandler { private final String version; private final ConfigDumpExporter configDumpExporter; private final ConfigDumpImporter configDumpImporter; + private final ConfigPersistence seed; private final FileTtlManager fileTtlManager; public ArchiveHandler(final String version, final ConfigRepository configRepository, final JobPersistence jobPersistence, + final ConfigPersistence seed, final WorkspaceHelper workspaceHelper, final FileTtlManager fileTtlManager, final SpecFetcher specFetcher) { @@ -65,16 +67,19 @@ public ArchiveHandler(final String version, version, fileTtlManager, new ConfigDumpExporter(configRepository, jobPersistence, workspaceHelper), - new ConfigDumpImporter(configRepository, jobPersistence, workspaceHelper, specFetcher)); + new ConfigDumpImporter(configRepository, jobPersistence, workspaceHelper, specFetcher), + seed); } public ArchiveHandler(final String version, final FileTtlManager fileTtlManager, final ConfigDumpExporter configDumpExporter, - final ConfigDumpImporter configDumpImporter) { + final ConfigDumpImporter configDumpImporter, + final ConfigPersistence seed) { this.version = version; this.configDumpExporter = configDumpExporter; this.configDumpImporter = configDumpImporter; + this.seed = seed; this.fileTtlManager = fileTtlManager; } @@ -95,13 +100,13 @@ public File exportData() { * @param workspaceIdRequestBody which is the target workspace to export * @return that lightweight tarball file */ - public File exportWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) { + public File exportWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) { final File archive; try { archive = configDumpExporter.exportWorkspace(workspaceIdRequestBody.getWorkspaceId()); fileTtlManager.register(archive.toPath()); return archive; - } catch (JsonValidationException | IOException | ConfigNotFoundException e) { + } catch (final JsonValidationException | IOException | ConfigNotFoundException e) { throw new InternalServerKnownException(String.format("Failed to export Workspace configuration due to: %s", e.getMessage())); } } @@ -112,15 +117,15 @@ public File exportWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) { * * @return a status object describing if import was successful or not. */ - public ImportRead importData(File archive) { + public ImportRead importData(final File archive) { try { - return importInternal(() -> configDumpImporter.importDataWithSeed(version, archive, YamlSeedConfigPersistence.get())); + return importInternal(() -> configDumpImporter.importDataWithSeed(version, archive, seed)); } finally { FileUtils.deleteQuietly(archive); } } - public UploadRead uploadArchiveResource(File archive) { + public UploadRead uploadArchiveResource(final File archive) { return configDumpImporter.uploadArchiveResource(archive); } @@ -132,7 +137,7 @@ public UploadRead uploadArchiveResource(File archive) { * * @return a status object describing if import was successful or not. */ - public ImportRead importIntoWorkspace(ImportRequestBody importRequestBody) { + public ImportRead importIntoWorkspace(final ImportRequestBody importRequestBody) { final File archive = configDumpImporter.getArchiveResource(importRequestBody.getResourceId()); try { return importInternal( @@ -142,12 +147,12 @@ public ImportRead importIntoWorkspace(ImportRequestBody importRequestBody) { } } - private ImportRead importInternal(importCall importCall) { + private ImportRead importInternal(final importCall importCall) { ImportRead result; try { importCall.importData(); result = new ImportRead().status(StatusEnum.SUCCEEDED); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.error("Import failed", e); result = new ImportRead().status(StatusEnum.FAILED).reason(e.getMessage()); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java index ce8184eab9d5..03406a42b21c 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java @@ -106,7 +106,6 @@ public void register(final Path path) {} @BeforeAll public static void dbSetup() { - YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); container = new PostgreSQLContainer<>("postgres:13-alpine") .withDatabaseName("airbyte") .withUsername("docker") @@ -124,7 +123,7 @@ public void setup() throws Exception { database = new JobsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); jobPersistence = new DefaultJobPersistence(database); database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); - seedPersistence = YamlSeedConfigPersistence.get(); + seedPersistence = YamlSeedConfigPersistence.getDefault(); configPersistence = new DatabaseConfigPersistence(database); configPersistence.replaceAllConfigs(Collections.emptyMap(), false); configPersistence.loadData(seedPersistence); @@ -141,6 +140,7 @@ public void setup() throws Exception { VERSION, configRepository, jobPersistence, + YamlSeedConfigPersistence.getDefault(), new WorkspaceHelper(configRepository, jobPersistence), new NoOpFileTtlManager(), specFetcher); diff --git a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java index be961d3c98a5..a3cac5eb2d89 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java @@ -123,22 +123,22 @@ public void testRunMigration() throws Exception { } } - private void assertPreMigrationConfigs(Path configRoot, JobPersistence jobPersistence) throws Exception { + private void assertPreMigrationConfigs(final Path configRoot, final JobPersistence jobPersistence) throws Exception { assertDatabaseVersion(jobPersistence, INITIAL_VERSION); - ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot)); - Map sourceDefinitionsBeforeMigration = configRepository.listStandardSources().stream() + final ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot)); + final Map sourceDefinitionsBeforeMigration = configRepository.listStandardSources().stream() .collect(Collectors.toMap(c -> c.getSourceDefinitionId().toString(), c -> c)); assertTrue(sourceDefinitionsBeforeMigration.containsKey(DEPRECATED_SOURCE_DEFINITION_NOT_BEING_USED)); assertTrue(sourceDefinitionsBeforeMigration.containsKey(DEPRECATED_SOURCE_DEFINITION_BEING_USED)); } - private void assertDatabaseVersion(JobPersistence jobPersistence, String version) throws IOException { + private void assertDatabaseVersion(final JobPersistence jobPersistence, final String version) throws IOException { final Optional versionFromDb = jobPersistence.getVersion(); assertTrue(versionFromDb.isPresent()); assertEquals(versionFromDb.get(), version); } - private void assertPostMigrationConfigs(Path importRoot) throws Exception { + private void assertPostMigrationConfigs(final Path importRoot) throws Exception { final ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(importRoot)); final UUID workspaceId = configRepository.listStandardWorkspaces(true).get(0).getWorkspaceId(); // originally the default workspace started with a hardcoded id. the migration in version 0.29.0 @@ -154,7 +154,7 @@ private void assertPostMigrationConfigs(Path importRoot) throws Exception { assertDestinationDefinitions(configRepository); } - private void assertSourceDefinitions(ConfigRepository configRepository) throws JsonValidationException, IOException { + private void assertSourceDefinitions(final ConfigRepository configRepository) throws JsonValidationException, IOException { final Map sourceDefinitions = configRepository.listStandardSources() .stream() .collect(Collectors.toMap(c -> c.getSourceDefinitionId().toString(), c -> c)); @@ -170,7 +170,7 @@ private void assertSourceDefinitions(ConfigRepository configRepository) throws J assertEquals("MySQL", mysqlDefinition.getName()); final StandardSourceDefinition postgresDefinition = sourceDefinitions.get("decd338e-5647-4c0b-adf4-da0e75f5a750"); - String[] tagBrokenAsArray = postgresDefinition.getDockerImageTag().replace(".", ",").split(","); + final String[] tagBrokenAsArray = postgresDefinition.getDockerImageTag().replace(".", ",").split(","); assertEquals(3, tagBrokenAsArray.length); assertTrue(Integer.parseInt(tagBrokenAsArray[0]) >= 0); assertTrue(Integer.parseInt(tagBrokenAsArray[1]) >= 3); @@ -178,7 +178,7 @@ private void assertSourceDefinitions(ConfigRepository configRepository) throws J assertTrue(postgresDefinition.getName().contains("Postgres")); } - private void assertDestinationDefinitions(ConfigRepository configRepository) throws JsonValidationException, IOException { + private void assertDestinationDefinitions(final ConfigRepository configRepository) throws JsonValidationException, IOException { final Map sourceDefinitions = configRepository.listStandardDestinationDefinitions() .stream() .collect(Collectors.toMap(c -> c.getDestinationDefinitionId().toString(), c -> c)); @@ -193,7 +193,7 @@ private void assertDestinationDefinitions(ConfigRepository configRepository) thr assertEquals("0.2.0", localCsvDefinition.getDockerImageTag()); final StandardDestinationDefinition snowflakeDefinition = sourceDefinitions.get("424892c4-daac-4491-b35d-c6688ba547ba"); - String[] tagBrokenAsArray = snowflakeDefinition.getDockerImageTag().replace(".", ",").split(","); + final String[] tagBrokenAsArray = snowflakeDefinition.getDockerImageTag().replace(".", ",").split(","); assertEquals(3, tagBrokenAsArray.length); assertTrue(Integer.parseInt(tagBrokenAsArray[0]) >= 0); assertTrue(Integer.parseInt(tagBrokenAsArray[1]) >= 3); @@ -201,12 +201,12 @@ private void assertDestinationDefinitions(ConfigRepository configRepository) thr assertTrue(snowflakeDefinition.getName().contains("Snowflake")); } - private void assertStandardSyncs(ConfigRepository configRepository, - StandardSyncOperation standardSyncOperation) + private void assertStandardSyncs(final ConfigRepository configRepository, + final StandardSyncOperation standardSyncOperation) throws ConfigNotFoundException, IOException, JsonValidationException { final List standardSyncs = configRepository.listStandardSyncs(); assertEquals(standardSyncs.size(), 2); - for (StandardSync standardSync : standardSyncs) { + for (final StandardSync standardSync : standardSyncs) { if (standardSync.getConnectionId().toString().equals("a294256f-1abe-4837-925f-91602c7207b4")) { assertEquals(standardSync.getPrefix(), ""); assertEquals(standardSync.getSourceId().toString(), "28ffee2b-372a-4f72-9b95-8ed56a8b99c5"); @@ -233,7 +233,7 @@ private void assertStandardSyncs(ConfigRepository configRepository, } @NotNull - private StandardSyncOperation assertSyncOperations(ConfigRepository configRepository) throws IOException, JsonValidationException { + private StandardSyncOperation assertSyncOperations(final ConfigRepository configRepository) throws IOException, JsonValidationException { final List standardSyncOperations = configRepository.listStandardSyncOperations(); assertEquals(standardSyncOperations.size(), 1); final StandardSyncOperation standardSyncOperation = standardSyncOperations.get(0); @@ -245,7 +245,7 @@ private StandardSyncOperation assertSyncOperations(ConfigRepository configReposi return standardSyncOperation; } - private void assertSources(ConfigRepository configRepository, UUID workspaceId) throws JsonValidationException, IOException { + private void assertSources(final ConfigRepository configRepository, final UUID workspaceId) throws JsonValidationException, IOException { final Map sources = configRepository.listSourceConnection() .stream() .collect(Collectors.toMap(sourceConnection -> sourceConnection.getSourceId().toString(), sourceConnection -> sourceConnection)); @@ -264,7 +264,7 @@ private void assertSources(ConfigRepository configRepository, UUID workspaceId) } - private void assertWorkspace(ConfigRepository configRepository, UUID workspaceId) throws JsonValidationException, IOException { + private void assertWorkspace(final ConfigRepository configRepository, final UUID workspaceId) throws JsonValidationException, IOException { final List standardWorkspaces = configRepository.listStandardWorkspaces(true); assertEquals(1, standardWorkspaces.size()); final StandardWorkspace workspace = standardWorkspaces.get(0); @@ -279,7 +279,7 @@ private void assertWorkspace(ConfigRepository configRepository, UUID workspaceId assertEquals(false, workspace.getDisplaySetupWizard()); } - private void assertDestinations(ConfigRepository configRepository, UUID workspaceId) throws JsonValidationException, IOException { + private void assertDestinations(final ConfigRepository configRepository, final UUID workspaceId) throws JsonValidationException, IOException { final List destinationConnections = configRepository.listDestinationConnection(); assertEquals(destinationConnections.size(), 2); for (final DestinationConnection destination : destinationConnections) { @@ -305,12 +305,12 @@ private void assertDestinations(ConfigRepository configRepository, UUID workspac } } - private void runMigration(JobPersistence jobPersistence, Path configRoot) throws Exception { + private void runMigration(final JobPersistence jobPersistence, final Path configRoot) throws Exception { try (final RunMigration runMigration = new RunMigration( jobPersistence, new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot)), TARGET_VERSION, - YamlSeedConfigPersistence.get(), + YamlSeedConfigPersistence.getDefault(), mock(SpecFetcher.class) // this test was disabled/broken when this fetcher mock was added. apologies if you have to fix this // in the future. )) { @@ -319,7 +319,7 @@ private void runMigration(JobPersistence jobPersistence, Path configRoot) throws } @SuppressWarnings("SameParameterValue") - private JobPersistence getJobPersistence(Database database, File file, String version) throws IOException { + private JobPersistence getJobPersistence(final Database database, final File file, final String version) throws IOException { final DefaultJobPersistence jobPersistence = new DefaultJobPersistence(database); final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), "db_init"); resourceToBeCleanedUp.add(tempFolder.toFile()); From 0ff8573f387b9f49fb1cf133e029e6641c7d2107 Mon Sep 17 00:00:00 2001 From: cgardens Date: Sat, 25 Sep 2021 05:34:04 -0700 Subject: [PATCH 3/3] remove unused invokation --- .../config/persistence/BaseDatabaseConfigPersistenceTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java index 3acc2075db56..ec9eb53496e9 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java @@ -56,7 +56,6 @@ public abstract class BaseDatabaseConfigPersistenceTest { @BeforeAll public static void dbSetup() { - YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); container = new PostgreSQLContainer<>("postgres:13-alpine") .withDatabaseName("airbyte") .withUsername("docker")