Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make the seed for YamlSeedConfigPersistence configurable #6409

Merged
merged 3 commits into from
Sep 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,45 @@
*/
public class YamlSeedConfigPersistence implements ConfigPersistence {

public static Class<?> DEFAULT_SEED_DEFINITION_RESOURCE_CLASS = SeedType.class;

private static final Map<AirbyteConfig, SeedType> CONFIG_SCHEMA_MAP = Map.of(
ConfigSchema.STANDARD_SOURCE_DEFINITION, SeedType.STANDARD_SOURCE_DEFINITION,
ConfigSchema.STANDARD_DESTINATION_DEFINITION, SeedType.STANDARD_DESTINATION_DEFINITION);

private static final YamlSeedConfigPersistence INSTANCE;
static {
try {
INSTANCE = new YamlSeedConfigPersistence();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static final Object lock = new Object();
private static YamlSeedConfigPersistence INSTANCE;

// A mapping from seed config type to config UUID to config.
private final ImmutableMap<SeedType, Map<String, JsonNode>> allSeedConfigs;

private YamlSeedConfigPersistence() throws IOException {
private YamlSeedConfigPersistence(final Class<?> seedDefinitionsResourceClass) throws IOException {
this.allSeedConfigs = ImmutableMap.<SeedType, Map<String, JsonNode>>builder()
.put(SeedType.STANDARD_SOURCE_DEFINITION, getConfigs(SeedType.STANDARD_SOURCE_DEFINITION))
.put(SeedType.STANDARD_DESTINATION_DEFINITION, getConfigs(SeedType.STANDARD_DESTINATION_DEFINITION))
.put(SeedType.STANDARD_SOURCE_DEFINITION, getConfigs(seedDefinitionsResourceClass, SeedType.STANDARD_SOURCE_DEFINITION))
.put(SeedType.STANDARD_DESTINATION_DEFINITION, getConfigs(seedDefinitionsResourceClass, SeedType.STANDARD_DESTINATION_DEFINITION))
.build();
}

public static void initialize(final Class<?> seedDefinitionsResourceClass) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you need to do that but this complex object lifecycle has some code smell.

Why is the YamlSeedConfigPersistence a singleton? Are we ever using it without injecting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this class can be constructed differently, I think it should not be a singleton any more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. thanks. this was my intuition too.

try {
INSTANCE = new YamlSeedConfigPersistence(seedDefinitionsResourceClass);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static YamlSeedConfigPersistence get() {
return INSTANCE;
synchronized (lock) {
if (INSTANCE == null) {
throw new IllegalStateException("YamlSeedConfigPersistence has not been initialized");
}
return INSTANCE;
}
}

@SuppressWarnings("UnstableApiUsage")
private static Map<String, JsonNode> getConfigs(SeedType seedType) throws IOException {
final URL url = Resources.getResource(SeedType.class, seedType.getResourcePath());
private static Map<String, JsonNode> getConfigs(final Class<?> seedDefinitionsResourceClass, final SeedType seedType) throws IOException {
final URL url = Resources.getResource(seedDefinitionsResourceClass, seedType.getResourcePath());
final String yamlString = Resources.toString(url, StandardCharsets.UTF_8);
final JsonNode configList = Yamls.deserialize(yamlString);
return MoreIterators.toList(configList.elements()).stream().collect(Collectors.toMap(
Expand All @@ -86,7 +95,7 @@ private static Map<String, JsonNode> getConfigs(SeedType seedType) throws IOExce
}

@Override
public <T> T getConfig(AirbyteConfig configType, String configId, Class<T> clazz)
public <T> T getConfig(final AirbyteConfig configType, final String configId, final Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
final Map<String, JsonNode> configs = allSeedConfigs.get(CONFIG_SCHEMA_MAP.get(configType));
if (configs == null) {
Expand All @@ -100,7 +109,7 @@ public <T> T getConfig(AirbyteConfig configType, String configId, Class<T> clazz
}

@Override
public <T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) {
public <T> List<T> listConfigs(final AirbyteConfig configType, final Class<T> clazz) {
final Map<String, JsonNode> configs = allSeedConfigs.get(CONFIG_SCHEMA_MAP.get(configType));
if (configs == null) {
throw new UnsupportedOperationException("There is no seed for " + configType.name());
Expand All @@ -109,17 +118,17 @@ public <T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) {
}

@Override
public <T> void writeConfig(AirbyteConfig configType, String configId, T config) {
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

@Override
public void deleteConfig(AirbyteConfig configType, String configId) {
public void deleteConfig(final AirbyteConfig configType, final String configId) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

@Override
public void replaceAllConfigs(Map<AirbyteConfig, Stream<?>> configs, boolean dryRun) {
public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final boolean dryRun) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

Expand All @@ -131,7 +140,7 @@ public Map<String, Stream<JsonNode>> dumpConfigs() {
}

@Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
public void loadData(final ConfigPersistence seedPersistence) throws IOException {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public abstract class BaseDatabaseConfigPersistenceTest {

@BeforeAll
public static void dbSetup() {
YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be we have some smiple helper method to construct this class, like YamlSeedConfigPersistence.getDefault(), which automatically use the default resource class? Otherwise people won't remember which class to use.

Copy link
Contributor

@tuliren tuliren Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This initialize method no longer exists. This line can be deleted. Otherwise code cannot compile.

Suggested change
YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);

container = new PostgreSQLContainer<>("postgres:13-alpine")
.withDatabaseName("airbyte")
.withUsername("docker")
Expand All @@ -75,7 +76,8 @@ public static void dbDown() {

static {
try {
ConfigPersistence seedPersistence = YamlSeedConfigPersistence.get();
YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
final ConfigPersistence seedPersistence = YamlSeedConfigPersistence.get();
SOURCE_GITHUB = seedPersistence
.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, "ef69ef6e-aa7f-4af1-a01d-ef775033524e", StandardSourceDefinition.class);
SOURCE_POSTGRES = seedPersistence
Expand All @@ -84,47 +86,49 @@ public static void dbDown() {
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "424892c4-daac-4491-b35d-c6688ba547ba", StandardDestinationDefinition.class);
DESTINATION_S3 = seedPersistence
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "4816b78f-1489-44c1-9060-4b19d5fa9362", StandardDestinationDefinition.class);
} catch (Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

protected static void writeSource(ConfigPersistence configPersistence, StandardSourceDefinition source) throws Exception {
protected static void writeSource(final ConfigPersistence configPersistence, final StandardSourceDefinition source) throws Exception {
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source);
}

protected static void writeDestination(ConfigPersistence configPersistence, StandardDestinationDefinition destination) throws Exception {
protected static void writeDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
throws Exception {
configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination);
}

protected static void deleteDestination(ConfigPersistence configPersistence, StandardDestinationDefinition destination) throws Exception {
protected static void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
throws Exception {
configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString());
}

protected Map<String, Set<JsonNode>> getMapWithSet(Map<String, Stream<JsonNode>> input) {
protected Map<String, Set<JsonNode>> getMapWithSet(final Map<String, Stream<JsonNode>> input) {
return input.entrySet().stream().collect(Collectors.toMap(
Entry::getKey,
e -> e.getValue().collect(Collectors.toSet())));
}

// assertEquals cannot correctly check the equality of two maps with stream values,
// so streams are converted to sets before being compared.
protected void assertSameConfigDump(Map<String, Stream<JsonNode>> expected, Map<String, Stream<JsonNode>> actual) {
protected void assertSameConfigDump(final Map<String, Stream<JsonNode>> expected, final Map<String, Stream<JsonNode>> actual) {
assertEquals(getMapWithSet(expected), getMapWithSet(actual));
}

protected void assertRecordCount(int expectedCount) throws Exception {
Result<Record1<Integer>> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table("airbyte_configs")).fetch());
protected void assertRecordCount(final int expectedCount) throws Exception {
final Result<Record1<Integer>> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table("airbyte_configs")).fetch());
assertEquals(expectedCount, recordCount.get(0).value1());
}

protected void assertHasSource(StandardSourceDefinition source) throws Exception {
protected void assertHasSource(final StandardSourceDefinition source) throws Exception {
assertEquals(source, configPersistence
.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(),
StandardSourceDefinition.class));
}

protected void assertHasDestination(StandardDestinationDefinition destination) throws Exception {
protected void assertHasDestination(final StandardDestinationDefinition destination) throws Exception {
assertEquals(destination, configPersistence
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(),
StandardDestinationDefinition.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class DatabaseConfigPersistenceLoadDataTest extends BaseDatabaseConfigPer

@BeforeAll
public static void setup() throws Exception {
YamlSeedConfigPersistence.initialize(YamlSeedConfigPersistence.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
configPersistence = spy(new DatabaseConfigPersistence(database));
database.query(ctx -> ctx.execute("TRUNCATE TABLE airbyte_configs"));
Expand Down Expand Up @@ -100,23 +101,23 @@ public void testUpdateConfigsInNonEmptyDatabase() throws Exception {
@DisplayName("When a connector is in use, its definition should not be updated")
public void testNoUpdateForUsedConnector() throws Exception {
// the seed has a newer version of s3 destination and github source
StandardDestinationDefinition destinationS3V2 = YamlSeedConfigPersistence.get()
final StandardDestinationDefinition destinationS3V2 = YamlSeedConfigPersistence.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test shows a good example why the static class variable should not be modified. It is very confusing that we need to call initialize in the setup method first before calling the get here. No one will remember to do this until they get the exception message.

.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "4816b78f-1489-44c1-9060-4b19d5fa9362", StandardDestinationDefinition.class)
.withDockerImageTag("10000.1.0");
when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class))
.thenReturn(Collections.singletonList(destinationS3V2));
StandardSourceDefinition sourceGithubV2 = YamlSeedConfigPersistence.get()
final StandardSourceDefinition sourceGithubV2 = YamlSeedConfigPersistence.get()
.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, "ef69ef6e-aa7f-4af1-a01d-ef775033524e", StandardSourceDefinition.class)
.withDockerImageTag("10000.15.3");
when(seedPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))
.thenReturn(Collections.singletonList(sourceGithubV2));

// create connections to mark the source and destination as in use
DestinationConnection s3Connection = new DestinationConnection()
final DestinationConnection s3Connection = new DestinationConnection()
.withDestinationId(UUID.randomUUID())
.withDestinationDefinitionId(destinationS3V2.getDestinationDefinitionId());
configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, s3Connection.getDestinationId().toString(), s3Connection);
SourceConnection githubConnection = new SourceConnection()
final SourceConnection githubConnection = new SourceConnection()
.withSourceId(UUID.randomUUID())
.withSourceDefinitionId(sourceGithubV2.getSourceDefinitionId());
configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, githubConnection.getSourceId().toString(), githubConnection);
Expand All @@ -132,7 +133,7 @@ public void testNoUpdateForUsedConnector() throws Exception {
@DisplayName("When a connector is not in use, its definition should be updated")
public void testUpdateForUnusedConnector() throws Exception {
// the seed has a newer version of snowflake destination
StandardDestinationDefinition snowflakeV2 = YamlSeedConfigPersistence.get()
final StandardDestinationDefinition snowflakeV2 = YamlSeedConfigPersistence.get()
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "424892c4-daac-4491-b35d-c6688ba547ba", StandardDestinationDefinition.class)
.withDockerImageTag("10000.2.0");
when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,42 +122,42 @@ public void testDumpConfigs() throws Exception {
writeSource(configPersistence, SOURCE_GITHUB);
writeSource(configPersistence, SOURCE_POSTGRES);
writeDestination(configPersistence, DESTINATION_S3);
Map<String, Stream<JsonNode>> actual = configPersistence.dumpConfigs();
Map<String, Stream<JsonNode>> expected = Map.of(
final Map<String, Stream<JsonNode>> actual = configPersistence.dumpConfigs();
final Map<String, Stream<JsonNode>> expected = Map.of(
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), Stream.of(Jsons.jsonNode(SOURCE_GITHUB), Jsons.jsonNode(SOURCE_POSTGRES)),
ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(), Stream.of(Jsons.jsonNode(DESTINATION_S3)));
assertSameConfigDump(expected, actual);
}

@Test
public void testGetConnectorRepositoryToInfoMap() throws Exception {
String connectorRepository = "airbyte/duplicated-connector";
String oldVersion = "0.1.10";
String newVersion = "0.2.0";
StandardSourceDefinition source1 = new StandardSourceDefinition()
final String connectorRepository = "airbyte/duplicated-connector";
final String oldVersion = "0.1.10";
final String newVersion = "0.2.0";
final StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withDockerRepository(connectorRepository)
.withDockerImageTag(oldVersion);
StandardSourceDefinition source2 = new StandardSourceDefinition()
final StandardSourceDefinition source2 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withDockerRepository(connectorRepository)
.withDockerImageTag(newVersion);
writeSource(configPersistence, source1);
writeSource(configPersistence, source2);
Map<String, ConnectorInfo> result = database.query(ctx -> configPersistence.getConnectorRepositoryToInfoMap(ctx));
final Map<String, ConnectorInfo> result = database.query(ctx -> configPersistence.getConnectorRepositoryToInfoMap(ctx));
// when there are duplicated connector definitions, the one with the latest version should be
// retrieved
assertEquals(newVersion, result.get(connectorRepository).dockerImageTag);
}

@Test
public void testInsertConfigRecord() throws Exception {
OffsetDateTime timestamp = OffsetDateTime.now();
UUID definitionId = UUID.randomUUID();
String connectorRepository = "airbyte/test-connector";
final OffsetDateTime timestamp = OffsetDateTime.now();
final UUID definitionId = UUID.randomUUID();
final String connectorRepository = "airbyte/test-connector";

// when the record does not exist, it is inserted
StandardSourceDefinition source1 = new StandardSourceDefinition()
final StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.1.2");
Expand All @@ -175,7 +175,7 @@ public void testInsertConfigRecord() throws Exception {
assertHasSource(SOURCE_GITHUB);

// when the record already exists, it is ignored
StandardSourceDefinition source2 = new StandardSourceDefinition()
final StandardSourceDefinition source2 = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.1.5");
Expand All @@ -193,11 +193,11 @@ public void testInsertConfigRecord() throws Exception {

@Test
public void testUpdateConfigRecord() throws Exception {
OffsetDateTime timestamp = OffsetDateTime.now();
UUID definitionId = UUID.randomUUID();
String connectorRepository = "airbyte/test-connector";
final OffsetDateTime timestamp = OffsetDateTime.now();
final UUID definitionId = UUID.randomUUID();
final String connectorRepository = "airbyte/test-connector";

StandardSourceDefinition oldSource = new StandardSourceDefinition()
final StandardSourceDefinition oldSource = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.3.5");
Expand All @@ -208,7 +208,7 @@ public void testUpdateConfigRecord() throws Exception {
assertHasSource(oldSource);
assertHasSource(SOURCE_GITHUB);

StandardSourceDefinition newSource = new StandardSourceDefinition()
final StandardSourceDefinition newSource = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.3.5");
Expand All @@ -231,22 +231,22 @@ public void testHasNewVersion() {

@Test
public void testGetNewFields() {
JsonNode o1 = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
JsonNode o2 = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3 }");
final JsonNode o1 = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
final JsonNode o2 = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3 }");
assertEquals(Collections.emptySet(), DatabaseConfigPersistence.getNewFields(o1, o1));
assertEquals(Collections.singleton("field3"), DatabaseConfigPersistence.getNewFields(o1, o2));
assertEquals(Collections.singleton("field2"), DatabaseConfigPersistence.getNewFields(o2, o1));
}

@Test
public void testGetDefinitionWithNewFields() {
JsonNode current = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
JsonNode latest = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3, \"field4\": 4 }");
Set<String> newFields = Set.of("field3");
final JsonNode current = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
final JsonNode latest = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3, \"field4\": 4 }");
final Set<String> newFields = Set.of("field3");

assertEquals(current, DatabaseConfigPersistence.getDefinitionWithNewFields(current, latest, Collections.emptySet()));

JsonNode currentWithNewFields = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2, \"field3\": 3 }");
final JsonNode currentWithNewFields = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2, \"field3\": 3 }");
assertEquals(currentWithNewFields, DatabaseConfigPersistence.getDefinitionWithNewFields(current, latest, newFields));
}

Expand Down
Loading