Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make the seed for YamlSeedConfigPersistence configurable #6409

Merged
merged 3 commits into from
Sep 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,33 @@
*/
public class YamlSeedConfigPersistence implements ConfigPersistence {

public static Class<?> DEFAULT_SEED_DEFINITION_RESOURCE_CLASS = SeedType.class;

private static final Map<AirbyteConfig, SeedType> CONFIG_SCHEMA_MAP = Map.of(
ConfigSchema.STANDARD_SOURCE_DEFINITION, SeedType.STANDARD_SOURCE_DEFINITION,
ConfigSchema.STANDARD_DESTINATION_DEFINITION, SeedType.STANDARD_DESTINATION_DEFINITION);

private static final YamlSeedConfigPersistence INSTANCE;
static {
try {
INSTANCE = new YamlSeedConfigPersistence();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

// A mapping from seed config type to config UUID to config.
private final ImmutableMap<SeedType, Map<String, JsonNode>> allSeedConfigs;

private YamlSeedConfigPersistence() throws IOException {
this.allSeedConfigs = ImmutableMap.<SeedType, Map<String, JsonNode>>builder()
.put(SeedType.STANDARD_SOURCE_DEFINITION, getConfigs(SeedType.STANDARD_SOURCE_DEFINITION))
.put(SeedType.STANDARD_DESTINATION_DEFINITION, getConfigs(SeedType.STANDARD_DESTINATION_DEFINITION))
.build();
public static YamlSeedConfigPersistence getDefault() throws IOException {
return new YamlSeedConfigPersistence(DEFAULT_SEED_DEFINITION_RESOURCE_CLASS);
}

public static YamlSeedConfigPersistence get(final Class<?> seedDefinitionsResourceClass) throws IOException {
return new YamlSeedConfigPersistence(seedDefinitionsResourceClass);
}

public static YamlSeedConfigPersistence get() {
return INSTANCE;
private YamlSeedConfigPersistence(final Class<?> seedDefinitionsResourceClass) throws IOException {
this.allSeedConfigs = ImmutableMap.<SeedType, Map<String, JsonNode>>builder()
.put(SeedType.STANDARD_SOURCE_DEFINITION, getConfigs(seedDefinitionsResourceClass, SeedType.STANDARD_SOURCE_DEFINITION))
.put(SeedType.STANDARD_DESTINATION_DEFINITION, getConfigs(seedDefinitionsResourceClass, SeedType.STANDARD_DESTINATION_DEFINITION))
.build();
}

@SuppressWarnings("UnstableApiUsage")
private static Map<String, JsonNode> getConfigs(SeedType seedType) throws IOException {
final URL url = Resources.getResource(SeedType.class, seedType.getResourcePath());
private static Map<String, JsonNode> getConfigs(final Class<?> seedDefinitionsResourceClass, final SeedType seedType) throws IOException {
final URL url = Resources.getResource(seedDefinitionsResourceClass, seedType.getResourcePath());
final String yamlString = Resources.toString(url, StandardCharsets.UTF_8);
final JsonNode configList = Yamls.deserialize(yamlString);
return MoreIterators.toList(configList.elements()).stream().collect(Collectors.toMap(
Expand All @@ -86,7 +83,7 @@ private static Map<String, JsonNode> getConfigs(SeedType seedType) throws IOExce
}

@Override
public <T> T getConfig(AirbyteConfig configType, String configId, Class<T> clazz)
public <T> T getConfig(final AirbyteConfig configType, final String configId, final Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
final Map<String, JsonNode> configs = allSeedConfigs.get(CONFIG_SCHEMA_MAP.get(configType));
if (configs == null) {
Expand All @@ -100,7 +97,7 @@ public <T> T getConfig(AirbyteConfig configType, String configId, Class<T> clazz
}

@Override
public <T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) {
public <T> List<T> listConfigs(final AirbyteConfig configType, final Class<T> clazz) {
final Map<String, JsonNode> configs = allSeedConfigs.get(CONFIG_SCHEMA_MAP.get(configType));
if (configs == null) {
throw new UnsupportedOperationException("There is no seed for " + configType.name());
Expand All @@ -109,17 +106,17 @@ public <T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) {
}

@Override
public <T> void writeConfig(AirbyteConfig configType, String configId, T config) {
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

@Override
public void deleteConfig(AirbyteConfig configType, String configId) {
public void deleteConfig(final AirbyteConfig configType, final String configId) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

@Override
public void replaceAllConfigs(Map<AirbyteConfig, Stream<?>> configs, boolean dryRun) {
public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final boolean dryRun) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

Expand All @@ -131,7 +128,7 @@ public Map<String, Stream<JsonNode>> dumpConfigs() {
}

@Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
public void loadData(final ConfigPersistence seedPersistence) throws IOException {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static void dbDown() {

static {
try {
ConfigPersistence seedPersistence = YamlSeedConfigPersistence.get();
final ConfigPersistence seedPersistence = YamlSeedConfigPersistence.getDefault();
SOURCE_GITHUB = seedPersistence
.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, "ef69ef6e-aa7f-4af1-a01d-ef775033524e", StandardSourceDefinition.class);
SOURCE_POSTGRES = seedPersistence
Expand All @@ -84,47 +84,49 @@ public static void dbDown() {
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "424892c4-daac-4491-b35d-c6688ba547ba", StandardDestinationDefinition.class);
DESTINATION_S3 = seedPersistence
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "4816b78f-1489-44c1-9060-4b19d5fa9362", StandardDestinationDefinition.class);
} catch (Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

protected static void writeSource(ConfigPersistence configPersistence, StandardSourceDefinition source) throws Exception {
protected static void writeSource(final ConfigPersistence configPersistence, final StandardSourceDefinition source) throws Exception {
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source);
}

protected static void writeDestination(ConfigPersistence configPersistence, StandardDestinationDefinition destination) throws Exception {
protected static void writeDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
throws Exception {
configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination);
}

protected static void deleteDestination(ConfigPersistence configPersistence, StandardDestinationDefinition destination) throws Exception {
protected static void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
throws Exception {
configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString());
}

protected Map<String, Set<JsonNode>> getMapWithSet(Map<String, Stream<JsonNode>> input) {
protected Map<String, Set<JsonNode>> getMapWithSet(final Map<String, Stream<JsonNode>> input) {
return input.entrySet().stream().collect(Collectors.toMap(
Entry::getKey,
e -> e.getValue().collect(Collectors.toSet())));
}

// assertEquals cannot correctly check the equality of two maps with stream values,
// so streams are converted to sets before being compared.
protected void assertSameConfigDump(Map<String, Stream<JsonNode>> expected, Map<String, Stream<JsonNode>> actual) {
protected void assertSameConfigDump(final Map<String, Stream<JsonNode>> expected, final Map<String, Stream<JsonNode>> actual) {
assertEquals(getMapWithSet(expected), getMapWithSet(actual));
}

protected void assertRecordCount(int expectedCount) throws Exception {
Result<Record1<Integer>> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table("airbyte_configs")).fetch());
protected void assertRecordCount(final int expectedCount) throws Exception {
final Result<Record1<Integer>> recordCount = database.query(ctx -> ctx.select(count(asterisk())).from(table("airbyte_configs")).fetch());
assertEquals(expectedCount, recordCount.get(0).value1());
}

protected void assertHasSource(StandardSourceDefinition source) throws Exception {
protected void assertHasSource(final StandardSourceDefinition source) throws Exception {
assertEquals(source, configPersistence
.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(),
StandardSourceDefinition.class));
}

protected void assertHasDestination(StandardDestinationDefinition destination) throws Exception {
protected void assertHasDestination(final StandardDestinationDefinition destination) throws Exception {
assertEquals(destination, configPersistence
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(),
StandardDestinationDefinition.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,23 @@ public void testUpdateConfigsInNonEmptyDatabase() throws Exception {
@DisplayName("When a connector is in use, its definition should not be updated")
public void testNoUpdateForUsedConnector() throws Exception {
// the seed has a newer version of s3 destination and github source
StandardDestinationDefinition destinationS3V2 = YamlSeedConfigPersistence.get()
final StandardDestinationDefinition destinationS3V2 = YamlSeedConfigPersistence.getDefault()
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "4816b78f-1489-44c1-9060-4b19d5fa9362", StandardDestinationDefinition.class)
.withDockerImageTag("10000.1.0");
when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class))
.thenReturn(Collections.singletonList(destinationS3V2));
StandardSourceDefinition sourceGithubV2 = YamlSeedConfigPersistence.get()
final StandardSourceDefinition sourceGithubV2 = YamlSeedConfigPersistence.getDefault()
.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, "ef69ef6e-aa7f-4af1-a01d-ef775033524e", StandardSourceDefinition.class)
.withDockerImageTag("10000.15.3");
when(seedPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))
.thenReturn(Collections.singletonList(sourceGithubV2));

// create connections to mark the source and destination as in use
DestinationConnection s3Connection = new DestinationConnection()
final DestinationConnection s3Connection = new DestinationConnection()
.withDestinationId(UUID.randomUUID())
.withDestinationDefinitionId(destinationS3V2.getDestinationDefinitionId());
configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, s3Connection.getDestinationId().toString(), s3Connection);
SourceConnection githubConnection = new SourceConnection()
final SourceConnection githubConnection = new SourceConnection()
.withSourceId(UUID.randomUUID())
.withSourceDefinitionId(sourceGithubV2.getSourceDefinitionId());
configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, githubConnection.getSourceId().toString(), githubConnection);
Expand All @@ -132,7 +132,7 @@ public void testNoUpdateForUsedConnector() throws Exception {
@DisplayName("When a connector is not in use, its definition should be updated")
public void testUpdateForUnusedConnector() throws Exception {
// the seed has a newer version of snowflake destination
StandardDestinationDefinition snowflakeV2 = YamlSeedConfigPersistence.get()
final StandardDestinationDefinition snowflakeV2 = YamlSeedConfigPersistence.getDefault()
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, "424892c4-daac-4491-b35d-c6688ba547ba", StandardDestinationDefinition.class)
.withDockerImageTag("10000.2.0");
when(seedPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,42 +122,42 @@ public void testDumpConfigs() throws Exception {
writeSource(configPersistence, SOURCE_GITHUB);
writeSource(configPersistence, SOURCE_POSTGRES);
writeDestination(configPersistence, DESTINATION_S3);
Map<String, Stream<JsonNode>> actual = configPersistence.dumpConfigs();
Map<String, Stream<JsonNode>> expected = Map.of(
final Map<String, Stream<JsonNode>> actual = configPersistence.dumpConfigs();
final Map<String, Stream<JsonNode>> expected = Map.of(
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), Stream.of(Jsons.jsonNode(SOURCE_GITHUB), Jsons.jsonNode(SOURCE_POSTGRES)),
ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(), Stream.of(Jsons.jsonNode(DESTINATION_S3)));
assertSameConfigDump(expected, actual);
}

@Test
public void testGetConnectorRepositoryToInfoMap() throws Exception {
String connectorRepository = "airbyte/duplicated-connector";
String oldVersion = "0.1.10";
String newVersion = "0.2.0";
StandardSourceDefinition source1 = new StandardSourceDefinition()
final String connectorRepository = "airbyte/duplicated-connector";
final String oldVersion = "0.1.10";
final String newVersion = "0.2.0";
final StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withDockerRepository(connectorRepository)
.withDockerImageTag(oldVersion);
StandardSourceDefinition source2 = new StandardSourceDefinition()
final StandardSourceDefinition source2 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withDockerRepository(connectorRepository)
.withDockerImageTag(newVersion);
writeSource(configPersistence, source1);
writeSource(configPersistence, source2);
Map<String, ConnectorInfo> result = database.query(ctx -> configPersistence.getConnectorRepositoryToInfoMap(ctx));
final Map<String, ConnectorInfo> result = database.query(ctx -> configPersistence.getConnectorRepositoryToInfoMap(ctx));
// when there are duplicated connector definitions, the one with the latest version should be
// retrieved
assertEquals(newVersion, result.get(connectorRepository).dockerImageTag);
}

@Test
public void testInsertConfigRecord() throws Exception {
OffsetDateTime timestamp = OffsetDateTime.now();
UUID definitionId = UUID.randomUUID();
String connectorRepository = "airbyte/test-connector";
final OffsetDateTime timestamp = OffsetDateTime.now();
final UUID definitionId = UUID.randomUUID();
final String connectorRepository = "airbyte/test-connector";

// when the record does not exist, it is inserted
StandardSourceDefinition source1 = new StandardSourceDefinition()
final StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.1.2");
Expand All @@ -175,7 +175,7 @@ public void testInsertConfigRecord() throws Exception {
assertHasSource(SOURCE_GITHUB);

// when the record already exists, it is ignored
StandardSourceDefinition source2 = new StandardSourceDefinition()
final StandardSourceDefinition source2 = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.1.5");
Expand All @@ -193,11 +193,11 @@ public void testInsertConfigRecord() throws Exception {

@Test
public void testUpdateConfigRecord() throws Exception {
OffsetDateTime timestamp = OffsetDateTime.now();
UUID definitionId = UUID.randomUUID();
String connectorRepository = "airbyte/test-connector";
final OffsetDateTime timestamp = OffsetDateTime.now();
final UUID definitionId = UUID.randomUUID();
final String connectorRepository = "airbyte/test-connector";

StandardSourceDefinition oldSource = new StandardSourceDefinition()
final StandardSourceDefinition oldSource = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.3.5");
Expand All @@ -208,7 +208,7 @@ public void testUpdateConfigRecord() throws Exception {
assertHasSource(oldSource);
assertHasSource(SOURCE_GITHUB);

StandardSourceDefinition newSource = new StandardSourceDefinition()
final StandardSourceDefinition newSource = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.3.5");
Expand All @@ -231,22 +231,22 @@ public void testHasNewVersion() {

@Test
public void testGetNewFields() {
JsonNode o1 = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
JsonNode o2 = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3 }");
final JsonNode o1 = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
final JsonNode o2 = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3 }");
assertEquals(Collections.emptySet(), DatabaseConfigPersistence.getNewFields(o1, o1));
assertEquals(Collections.singleton("field3"), DatabaseConfigPersistence.getNewFields(o1, o2));
assertEquals(Collections.singleton("field2"), DatabaseConfigPersistence.getNewFields(o2, o1));
}

@Test
public void testGetDefinitionWithNewFields() {
JsonNode current = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
JsonNode latest = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3, \"field4\": 4 }");
Set<String> newFields = Set.of("field3");
final JsonNode current = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
final JsonNode latest = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3, \"field4\": 4 }");
final Set<String> newFields = Set.of("field3");

assertEquals(current, DatabaseConfigPersistence.getDefinitionWithNewFields(current, latest, Collections.emptySet()));

JsonNode currentWithNewFields = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2, \"field3\": 3 }");
final JsonNode currentWithNewFields = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2, \"field3\": 3 }");
assertEquals(currentWithNewFields, DatabaseConfigPersistence.getDefinitionWithNewFields(current, latest, newFields));
}

Expand Down
Loading