Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validate classes in config persistence #10484

Merged
merged 4 commits into from
Mar 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface AirbyteConfig {
*/
File getConfigSchemaFile();

<T> Class<T> getClassName();

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.config;

import java.time.Instant;
import java.util.Objects;

public class ConfigWithMetadata<T> {

Expand All @@ -14,7 +15,7 @@ public class ConfigWithMetadata<T> {
private final Instant updatedAt;
private final T config;

public ConfigWithMetadata(String configId, String configType, Instant createdAt, Instant updatedAt, T config) {
public ConfigWithMetadata(final String configId, final String configType, final Instant createdAt, final Instant updatedAt, final T config) {
this.configId = configId;
this.configType = configType;
this.createdAt = createdAt;
Expand Down Expand Up @@ -42,4 +43,22 @@ public T getConfig() {
return config;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ConfigWithMetadata<?> that = (ConfigWithMetadata<?>) o;
return Objects.equals(configId, that.configId) && Objects.equals(configType, that.configType) && Objects.equals(
createdAt, that.createdAt) && Objects.equals(updatedAt, that.updatedAt) && Objects.equals(config, that.config);
}

@Override
public int hashCode() {
return Objects.hash(configId, configType, createdAt, updatedAt, config);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.client.util.Preconditions;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Validates that the class of inputs and outputs matches the class specified in the AirbyteConfig
* enum. Helps avoid type mistakes, which can happen because this iface can't type check at compile
* time.
*/
public class ClassEnforcingConfigPersistence implements ConfigPersistence {

private final ConfigPersistence decoratedPersistence;

public ClassEnforcingConfigPersistence(final ConfigPersistence decoratedPersistence) {
this.decoratedPersistence = decoratedPersistence;
}

@Override
public <T> T getConfig(final AirbyteConfig configType, final String configId, final Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
Preconditions.checkArgument(configType.getClassName().equals(clazz));
return decoratedPersistence.getConfig(configType, configId, clazz);
}

@Override
public <T> List<T> listConfigs(final AirbyteConfig configType, final Class<T> clazz) throws JsonValidationException, IOException {
Preconditions.checkArgument(configType.getClassName().equals(clazz));
return decoratedPersistence.listConfigs(configType, clazz);
}

@Override
public <T> ConfigWithMetadata<T> getConfigWithMetadata(final AirbyteConfig configType, final String configId, final Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
Preconditions.checkArgument(configType.getClassName().equals(clazz));
return decoratedPersistence.getConfigWithMetadata(configType, configId, clazz);
}

@Override
public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConfig configType, final Class<T> clazz)
throws JsonValidationException, IOException {
Preconditions.checkArgument(configType.getClassName().equals(clazz));
return decoratedPersistence.listConfigsWithMetadata(configType, clazz);
}

@Override
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws JsonValidationException, IOException {
Preconditions.checkArgument(configType.getClassName().equals(config.getClass()));
decoratedPersistence.writeConfig(configType, configId, config);
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs) throws IOException, JsonValidationException {
// attempt to check the input type. if it is empty, then there is nothing to check.
Preconditions.checkArgument(configs.isEmpty() || configType.getClassName().equals(new ArrayList<>(configs.values()).get(0).getClass()));
decoratedPersistence.writeConfigs(configType, configs);
}

@Override
public void deleteConfig(final AirbyteConfig configType, final String configId) throws ConfigNotFoundException, IOException {
decoratedPersistence.deleteConfig(configType, configId);
}

@Override
public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final boolean dryRun) throws IOException {
final Map<AirbyteConfig, Stream<?>> augmentedMap = new HashMap<>(configs).entrySet()
.stream()
.collect(Collectors.toMap(
Entry::getKey,
entry -> entry.getValue().peek(config -> Preconditions.checkArgument(entry.getKey().getClassName().equals(config.getClass())))));
decoratedPersistence.replaceAllConfigs(augmentedMap, dryRun);
}

@Override
public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
return decoratedPersistence.dumpConfigs();
}

@Override
public void loadData(final ConfigPersistence seedPersistence) throws IOException {
decoratedPersistence.loadData(seedPersistence);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,20 @@ public class DatabaseConfigPersistence implements ConfigPersistence {
private final FeatureFlags featureFlags;
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseConfigPersistence.class);

/**
* Entrypoint into DatabaseConfigPersistence. Except in testing, we should never be using it without
* it being decorated with validation classes.
*
* @param database - database where configs are stored
* @param jsonSecretsProcessor - for filtering secrets in export
* @param featureFlags - feature flags that govern secret export behavior
* @return database config persistence wrapped in validation decorators
*/
public static ConfigPersistence createWithValidation(final Database database,
final JsonSecretsProcessor jsonSecretsProcessor,
final FeatureFlags featureFlags) {
return new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor, featureFlags));
return new ClassEnforcingConfigPersistence(
new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor, featureFlags)));
}

public DatabaseConfigPersistence(final Database database, final JsonSecretsProcessor jsonSecretsProcessor, final FeatureFlags featureFlags) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream;

// we force all interaction with disk storage to be effectively single threaded.
/**
* Validates that json input and outputs for the ConfigPersistence against their schemas.
*/
public class ValidatingConfigPersistence implements ConfigPersistence {

private final JsonSchemaValidator schemaValidator;
Expand Down Expand Up @@ -96,8 +100,18 @@ public void deleteConfig(final AirbyteConfig configType, final String configId)

@Override
public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final boolean dryRun) throws IOException {
// todo (cgardens) need to do validation here.
decoratedPersistence.replaceAllConfigs(configs, dryRun);
final Map<AirbyteConfig, Stream<?>> augmentedMap = new HashMap<>(configs).entrySet()
.stream()
.collect(Collectors.toMap(
Entry::getKey,
entry -> entry.getValue().peek(config -> {
try {
validateJson(config, entry.getKey());
} catch (final JsonValidationException e) {
throw new RuntimeException(e);
}
})));
decoratedPersistence.replaceAllConfigs(augmentedMap, dryRun);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;

class ClassEnforcingConfigPersistenceTest {

public static final UUID UUID1 = UUID.randomUUID();
public static final Instant INSTANT = Instant.now();
public static final StandardWorkspace WORKSPACE = new StandardWorkspace();
public static final StandardSync STANDARD_SYNC = new StandardSync().withConnectionId(UUID1);

private ClassEnforcingConfigPersistence configPersistence;
private ConfigPersistence decoratedConfigPersistence;

@BeforeEach
void setUp() {
decoratedConfigPersistence = mock(ConfigPersistence.class);
configPersistence = new ClassEnforcingConfigPersistence(decoratedConfigPersistence);
}

@Test
void testWriteConfigSuccess() throws IOException, JsonValidationException {
configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), STANDARD_SYNC);
verify(decoratedConfigPersistence).writeConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), STANDARD_SYNC);
}

@Test
void testWriteConfigFailure() {
assertThrows(IllegalArgumentException.class,
() -> configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), WORKSPACE));
verifyNoInteractions(decoratedConfigPersistence);
}

@Test
void testWriteConfigsSuccess() throws IOException, JsonValidationException {
final Map<String, StandardSync> configs = ImmutableMap.of(UUID1.toString(), STANDARD_SYNC);
configPersistence.writeConfigs(ConfigSchema.STANDARD_SYNC, configs);
verify(decoratedConfigPersistence).writeConfigs(ConfigSchema.STANDARD_SYNC, configs);
}

@Test
void testWriteConfigsFailure() {
final Map<String, StandardWorkspace> configs = ImmutableMap.of(UUID1.toString(), WORKSPACE);
assertThrows(IllegalArgumentException.class, () -> configPersistence.writeConfigs(ConfigSchema.STANDARD_SYNC, configs));
verifyNoInteractions(decoratedConfigPersistence);
}

@Test
void testGetConfigSuccess() throws IOException, JsonValidationException, ConfigNotFoundException {
when(decoratedConfigPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardSync.class))
.thenReturn(STANDARD_SYNC);
assertEquals(STANDARD_SYNC, configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardSync.class));
}

@Test
void testGetConfigFailure() {
assertThrows(IllegalArgumentException.class,
() -> configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardWorkspace.class));
verifyNoInteractions(decoratedConfigPersistence);
}

@Test
void testListConfigsSuccess() throws IOException, JsonValidationException {
when(decoratedConfigPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class)).thenReturn(List.of(STANDARD_SYNC));
assertEquals(List.of(STANDARD_SYNC), configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class));
}

@Test
void testListConfigsFailure() {
assertThrows(IllegalArgumentException.class,
() -> configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardWorkspace.class));
verifyNoInteractions(decoratedConfigPersistence);
}

@Test
void testGetConfigWithMetadataSuccess() throws IOException, JsonValidationException, ConfigNotFoundException {
when(decoratedConfigPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardSync.class))
.thenReturn(withMetadata(STANDARD_SYNC));
assertEquals(withMetadata(STANDARD_SYNC),
configPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardSync.class));
}

@Test
void testGetConfigWithMetadataFailure() {
assertThrows(IllegalArgumentException.class,
() -> configPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardWorkspace.class));
verifyNoInteractions(decoratedConfigPersistence);
}

@Test
void testListConfigsWithMetadataSuccess() throws IOException, JsonValidationException {
when(decoratedConfigPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SYNC, StandardSync.class))
.thenReturn(List.of(withMetadata(STANDARD_SYNC)));
assertEquals(
List.of(withMetadata(STANDARD_SYNC)),
configPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SYNC, StandardSync.class));
}

@Test
void testListConfigsWithMetadataFailure() {
assertThrows(IllegalArgumentException.class,
() -> configPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SYNC, StandardWorkspace.class));
verifyNoInteractions(decoratedConfigPersistence);
}

@Test
void testReplaceAllConfigsSuccess() throws IOException, JsonValidationException {
consumeConfigInputStreams(decoratedConfigPersistence);
final Map<AirbyteConfig, Stream<?>> configs = ImmutableMap.of(ConfigSchema.STANDARD_SYNC, Stream.of(STANDARD_SYNC));
configPersistence.replaceAllConfigs(configs, false);
verify(decoratedConfigPersistence).replaceAllConfigs(any(), eq(false));
}

@Test
void testReplaceAllConfigsFailure() throws IOException {
consumeConfigInputStreams(decoratedConfigPersistence);
final Map<AirbyteConfig, Stream<?>> configs = ImmutableMap.of(ConfigSchema.STANDARD_SYNC, Stream.of(WORKSPACE));
assertThrows(IllegalArgumentException.class, () -> configPersistence.replaceAllConfigs(configs, false));
verify(decoratedConfigPersistence).replaceAllConfigs(any(), eq(false));
}

/**
* Consumes all streams input via replaceAllConfigs. This will trigger any exceptions that are
* thrown during processing.
*
* @param configPersistence - config persistence mock where this runs.
*/
private static void consumeConfigInputStreams(final ConfigPersistence configPersistence) throws IOException {
doAnswer((Answer<Void>) invocation -> {
final Map<AirbyteConfig, Stream<?>> argument = invocation.getArgument(0);
// force the streams to be consumed so that we can verify the exception was thrown.
argument.values().forEach(entry -> entry.collect(Collectors.toList()));
return null;
}).when(configPersistence).replaceAllConfigs(any(), eq(false));
}

@SuppressWarnings("SameParameterValue")
private static ConfigWithMetadata<StandardSync> withMetadata(final StandardSync actorDefinition) {
return new ConfigWithMetadata<>(actorDefinition.getConnectionId().toString(),
ConfigSchema.STANDARD_SYNC.name(),
INSTANT,
INSTANT,
actorDefinition);
}

}
Loading