From bfb9ea5b74810fcadf706837817490c93d8fb766 Mon Sep 17 00:00:00 2001 From: cgardens Date: Sun, 20 Feb 2022 11:49:26 -0800 Subject: [PATCH 1/4] validate classes in config persistence --- .../java/io/airbyte/config/AirbyteConfig.java | 2 + .../io/airbyte/config/ConfigWithMetadata.java | 21 ++- .../ClassEnforcingConfigPersistence.java | 93 ++++++++++++ .../DatabaseConfigPersistence.java | 4 +- .../ValidatingConfigPersistence.java | 4 +- .../ClassEnforcingConfigPersistenceTest.java | 138 ++++++++++++++++++ .../ValidatingConfigPersistenceTest.java | 56 +++++++ 7 files changed, 315 insertions(+), 3 deletions(-) create mode 100644 airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistence.java create mode 100644 airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistenceTest.java diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfig.java b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfig.java index e614d9c3c391..26c88931a4a5 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfig.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteConfig.java @@ -28,4 +28,6 @@ public interface AirbyteConfig { */ File getConfigSchemaFile(); + Class getClassName(); + } diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/ConfigWithMetadata.java b/airbyte-config/models/src/main/java/io/airbyte/config/ConfigWithMetadata.java index 21adc92856be..c46c6acceda6 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/ConfigWithMetadata.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/ConfigWithMetadata.java @@ -5,6 +5,7 @@ package io.airbyte.config; import java.time.Instant; +import java.util.Objects; public class ConfigWithMetadata { @@ -14,7 +15,7 @@ public class ConfigWithMetadata { private final Instant updatedAt; private final T config; - public ConfigWithMetadata(String configId, String configType, Instant createdAt, Instant updatedAt, T config) { + public ConfigWithMetadata(final String configId, final String configType, final Instant createdAt, final Instant updatedAt, final T config) { this.configId = configId; this.configType = configType; this.createdAt = createdAt; @@ -42,4 +43,22 @@ public T getConfig() { return config; } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ConfigWithMetadata that = (ConfigWithMetadata) o; + return Objects.equals(configId, that.configId) && Objects.equals(configType, that.configType) && Objects.equals( + createdAt, that.createdAt) && Objects.equals(updatedAt, that.updatedAt) && Objects.equals(config, that.config); + } + + @Override + public int hashCode() { + return Objects.hash(configId, configType, createdAt, updatedAt, config); + } + } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistence.java new file mode 100644 index 000000000000..faba8538561d --- /dev/null +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistence.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.persistence; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.api.client.util.Preconditions; +import io.airbyte.config.AirbyteConfig; +import io.airbyte.config.ConfigWithMetadata; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +/** + * Validates that the class of inputs and outputs matches the class specified in the AirbyteConfig + * enum. Helps avoid type mistakes, which can happen because this iface can't type check at compile + * time. + */ +public class ClassEnforcingConfigPersistence implements ConfigPersistence { + + private final ConfigPersistence decoratedPersistence; + + public ClassEnforcingConfigPersistence(final ConfigPersistence decoratedPersistence) { + this.decoratedPersistence = decoratedPersistence; + } + + @Override + public T getConfig(final AirbyteConfig configType, final String configId, final Class clazz) + throws ConfigNotFoundException, JsonValidationException, IOException { + Preconditions.checkArgument(configType.getClassName().equals(clazz)); + return decoratedPersistence.getConfig(configType, configId, clazz); + } + + @Override + public List listConfigs(final AirbyteConfig configType, final Class clazz) throws JsonValidationException, IOException { + Preconditions.checkArgument(configType.getClassName().equals(clazz)); + return decoratedPersistence.listConfigs(configType, clazz); + } + + @Override + public ConfigWithMetadata getConfigWithMetadata(final AirbyteConfig configType, final String configId, final Class clazz) + throws ConfigNotFoundException, JsonValidationException, IOException { + Preconditions.checkArgument(configType.getClassName().equals(clazz)); + return decoratedPersistence.getConfigWithMetadata(configType, configId, clazz); + } + + @Override + public List> listConfigsWithMetadata(final AirbyteConfig configType, final Class clazz) + throws JsonValidationException, IOException { + Preconditions.checkArgument(configType.getClassName().equals(clazz)); + return decoratedPersistence.listConfigsWithMetadata(configType, clazz); + } + + @Override + public void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws JsonValidationException, IOException { + Preconditions.checkArgument(configType.getClassName().equals(config.getClass())); + decoratedPersistence.writeConfig(configType, configId, config); + } + + @Override + public void writeConfigs(final AirbyteConfig configType, final Map configs) throws IOException, JsonValidationException { + // attempt to check the input type. if it is empty, then there is nothing to check. + Preconditions.checkArgument(configs.isEmpty() || configType.getClassName().equals(new ArrayList<>(configs.values()).get(0).getClass())); + decoratedPersistence.writeConfigs(configType, configs); + } + + @Override + public void deleteConfig(final AirbyteConfig configType, final String configId) throws ConfigNotFoundException, IOException { + decoratedPersistence.deleteConfig(configType, configId); + } + + @Override + public void replaceAllConfigs(final Map> configs, final boolean dryRun) throws IOException { + // todo (cgardens) need to do class enforcement here here. + decoratedPersistence.replaceAllConfigs(configs, dryRun); + } + + @Override + public Map> dumpConfigs() throws IOException { + return decoratedPersistence.dumpConfigs(); + } + + @Override + public void loadData(final ConfigPersistence seedPersistence) throws IOException { + // todo (cgardens) need to do class enforcement here here. + decoratedPersistence.loadData(seedPersistence); + } + +} diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index cf81750dc74e..f40a89e42b67 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -88,7 +88,8 @@ public class DatabaseConfigPersistence implements ConfigPersistence { public static ConfigPersistence createWithValidation(final Database database, final JsonSecretsProcessor jsonSecretsProcessor, final FeatureFlags featureFlags) { - return new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor, featureFlags)); + return new ClassEnforcingConfigPersistence( + new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor, featureFlags))); } public DatabaseConfigPersistence(final Database database, final JsonSecretsProcessor jsonSecretsProcessor, final FeatureFlags featureFlags) { @@ -1392,6 +1393,7 @@ private void deleteStandardSync(final String configId) throws IOException { }); } + // todo (cgardens) - how to protect types here? @Override public void replaceAllConfigs(final Map> configs, final boolean dryRun) throws IOException { if (dryRun) { diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java index 5cd4dd594849..7cd9a21227e0 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java @@ -16,7 +16,9 @@ import java.util.Map; import java.util.stream.Stream; -// we force all interaction with disk storage to be effectively single threaded. +/** + * Validates that json input and outputs for the ConfigPersistence against their schemas. + */ public class ValidatingConfigPersistence implements ConfigPersistence { private final JsonSchemaValidator schemaValidator; diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistenceTest.java new file mode 100644 index 000000000000..f4e5d122d58d --- /dev/null +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistenceTest.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.persistence; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.ConfigWithMetadata; +import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ClassEnforcingConfigPersistenceTest { + + public static final UUID UUID1 = UUID.randomUUID(); + public static final Instant INSTANT = Instant.now(); + public static final StandardWorkspace WORKSPACE = new StandardWorkspace(); + public static final StandardSync STANDARD_SYNC = new StandardSync().withConnectionId(UUID1); + + private ClassEnforcingConfigPersistence configPersistence; + private ConfigPersistence decoratedConfigPersistence; + + @BeforeEach + void setUp() { + decoratedConfigPersistence = mock(ConfigPersistence.class); + configPersistence = new ClassEnforcingConfigPersistence(decoratedConfigPersistence); + } + + @Test + void testWriteConfigSuccess() throws IOException, JsonValidationException { + configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), STANDARD_SYNC); + verify(decoratedConfigPersistence).writeConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), STANDARD_SYNC); + } + + @Test + void testWriteConfigFailure() { + assertThrows(IllegalArgumentException.class, + () -> configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), WORKSPACE)); + verifyNoInteractions(decoratedConfigPersistence); + } + + @Test + void testWriteConfigsSuccess() throws IOException, JsonValidationException { + final Map configs = ImmutableMap.of(UUID1.toString(), STANDARD_SYNC); + configPersistence.writeConfigs(ConfigSchema.STANDARD_SYNC, configs); + verify(decoratedConfigPersistence).writeConfigs(ConfigSchema.STANDARD_SYNC, configs); + } + + @Test + void testWriteConfigsFailure() { + final Map configs = ImmutableMap.of(UUID1.toString(), WORKSPACE); + assertThrows(IllegalArgumentException.class, () -> configPersistence.writeConfigs(ConfigSchema.STANDARD_SYNC, configs)); + verifyNoInteractions(decoratedConfigPersistence); + } + + @Test + void testGetConfigSuccess() throws IOException, JsonValidationException, ConfigNotFoundException { + when(decoratedConfigPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardSync.class)) + .thenReturn(STANDARD_SYNC); + assertEquals(STANDARD_SYNC, configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardSync.class)); + } + + @Test + void testGetConfigFailure() { + assertThrows(IllegalArgumentException.class, + () -> configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardWorkspace.class)); + verifyNoInteractions(decoratedConfigPersistence); + } + + @Test + void testListConfigsSuccess() throws IOException, JsonValidationException { + when(decoratedConfigPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class)).thenReturn(List.of(STANDARD_SYNC)); + assertEquals(List.of(STANDARD_SYNC), configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class)); + } + + @Test + void testListConfigsFailure() { + assertThrows(IllegalArgumentException.class, + () -> configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardWorkspace.class)); + verifyNoInteractions(decoratedConfigPersistence); + } + + @Test + void testGetConfigWithMetadataSuccess() throws IOException, JsonValidationException, ConfigNotFoundException { + when(decoratedConfigPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardSync.class)) + .thenReturn(withMetadata(STANDARD_SYNC)); + assertEquals(withMetadata(STANDARD_SYNC), + configPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardSync.class)); + } + + @Test + void testGetConfigWithMetadataFailure() { + assertThrows(IllegalArgumentException.class, + () -> configPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SYNC, UUID1.toString(), StandardWorkspace.class)); + verifyNoInteractions(decoratedConfigPersistence); + } + + @Test + void testListConfigsWithMetadataSuccess() throws IOException, JsonValidationException { + when(decoratedConfigPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SYNC, StandardSync.class)) + .thenReturn(List.of(withMetadata(STANDARD_SYNC))); + assertEquals( + List.of(withMetadata(STANDARD_SYNC)), + configPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SYNC, StandardSync.class)); + } + + @Test + void testListConfigsWithMetadataFailure() { + assertThrows(IllegalArgumentException.class, + () -> configPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SYNC, StandardWorkspace.class)); + verifyNoInteractions(decoratedConfigPersistence); + } + + @SuppressWarnings("SameParameterValue") + private static ConfigWithMetadata withMetadata(final StandardSync actorDefinition) { + return new ConfigWithMetadata<>(actorDefinition.getConnectionId().toString(), + ConfigSchema.STANDARD_SYNC.name(), + INSTANT, + INSTANT, + actorDefinition); + } + +} diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java index 3cbca7edcd9d..88be8200284c 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java @@ -15,11 +15,13 @@ import com.google.common.collect.Sets; import io.airbyte.config.ConfigSchema; +import io.airbyte.config.ConfigWithMetadata; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.nio.file.Path; +import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,6 +32,7 @@ class ValidatingConfigPersistenceTest { public static final UUID UUID_1 = new UUID(0, 1); + public static final Instant INSTANT = Instant.now(); public static final StandardSourceDefinition SOURCE_1 = new StandardSourceDefinition(); static { @@ -157,4 +160,57 @@ void testListConfigsFailure() throws JsonValidationException, IOException { .listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)); } + @Test + void testGetConfigWithMetadataSuccess() throws IOException, JsonValidationException, ConfigNotFoundException { + when(decoratedConfigPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), StandardSourceDefinition.class)) + .thenReturn(withMetadata(SOURCE_1)); + final ConfigWithMetadata actualConfig = configPersistence + .getConfigWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), StandardSourceDefinition.class); + + assertEquals(withMetadata(SOURCE_1), actualConfig); + } + + @Test + void testGetConfigWithMetadataFailure() throws IOException, JsonValidationException, ConfigNotFoundException { + doThrow(new JsonValidationException("error")).when(schemaValidator).ensure(any(), any()); + when(decoratedConfigPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), StandardSourceDefinition.class)) + .thenReturn(withMetadata(SOURCE_1)); + + assertThrows( + JsonValidationException.class, + () -> configPersistence.getConfigWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), StandardSourceDefinition.class)); + } + + @Test + void testListConfigsWithMetadataSuccess() throws JsonValidationException, IOException { + when(decoratedConfigPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) + .thenReturn(List.of(withMetadata(SOURCE_1), withMetadata(SOURCE_2))); + + final List> actualConfigs = configPersistence + .listConfigsWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class); + + // noinspection unchecked + assertEquals( + Sets.newHashSet(withMetadata(SOURCE_1), withMetadata(SOURCE_2)), + Sets.newHashSet(actualConfigs)); + } + + @Test + void testListConfigsWithMetadataFailure() throws JsonValidationException, IOException { + doThrow(new JsonValidationException("error")).when(schemaValidator).ensure(any(), any()); + when(decoratedConfigPersistence.listConfigsWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) + .thenReturn(List.of(withMetadata(SOURCE_1), withMetadata(SOURCE_2))); + + assertThrows(JsonValidationException.class, () -> configPersistence + .listConfigsWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)); + } + + private static ConfigWithMetadata withMetadata(final StandardSourceDefinition sourceDef) { + return new ConfigWithMetadata<>(sourceDef.getSourceDefinitionId().toString(), + ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), + INSTANT, + INSTANT, + sourceDef); + } + } From a959a205ba5aa2b1239fce9fb3f9e1ff3cdd63e0 Mon Sep 17 00:00:00 2001 From: cgardens Date: Sun, 20 Feb 2022 12:29:44 -0800 Subject: [PATCH 2/4] add protection for replaceAllConfigs --- .../ClassEnforcingConfigPersistence.java | 12 ++++-- .../ValidatingConfigPersistence.java | 16 +++++++- .../ClassEnforcingConfigPersistenceTest.java | 38 +++++++++++++++++ .../ValidatingConfigPersistenceTest.java | 41 +++++++++++++++++++ 4 files changed, 102 insertions(+), 5 deletions(-) diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistence.java index faba8538561d..20d63e3a1e49 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistence.java @@ -11,8 +11,11 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -75,8 +78,12 @@ public void deleteConfig(final AirbyteConfig configType, final String configId) @Override public void replaceAllConfigs(final Map> configs, final boolean dryRun) throws IOException { - // todo (cgardens) need to do class enforcement here here. - decoratedPersistence.replaceAllConfigs(configs, dryRun); + final Map> augmentedMap = new HashMap<>(configs).entrySet() + .stream() + .collect(Collectors.toMap( + Entry::getKey, + entry -> entry.getValue().peek(config -> Preconditions.checkArgument(entry.getKey().getClassName().equals(config.getClass()))))); + decoratedPersistence.replaceAllConfigs(augmentedMap, dryRun); } @Override @@ -86,7 +93,6 @@ public Map> dumpConfigs() throws IOException { @Override public void loadData(final ConfigPersistence seedPersistence) throws IOException { - // todo (cgardens) need to do class enforcement here here. decoratedPersistence.loadData(seedPersistence); } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java index 7cd9a21227e0..18f1e88227be 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java @@ -14,6 +14,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -98,8 +100,18 @@ public void deleteConfig(final AirbyteConfig configType, final String configId) @Override public void replaceAllConfigs(final Map> configs, final boolean dryRun) throws IOException { - // todo (cgardens) need to do validation here. - decoratedPersistence.replaceAllConfigs(configs, dryRun); + final Map> augmentedMap = new HashMap<>(configs).entrySet() + .stream() + .collect(Collectors.toMap( + Entry::getKey, + entry -> entry.getValue().peek(config -> { + try { + validateJson(config, entry.getKey()); + } catch (final JsonValidationException e) { + throw new RuntimeException(e); + } + }))); + decoratedPersistence.replaceAllConfigs(augmentedMap, dryRun); } @Override diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistenceTest.java index f4e5d122d58d..b2792c5a0ef8 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ClassEnforcingConfigPersistenceTest.java @@ -6,12 +6,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; import io.airbyte.config.ConfigWithMetadata; import io.airbyte.config.StandardSync; @@ -22,8 +26,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.stubbing.Answer; class ClassEnforcingConfigPersistenceTest { @@ -126,6 +133,37 @@ void testListConfigsWithMetadataFailure() { verifyNoInteractions(decoratedConfigPersistence); } + @Test + void testReplaceAllConfigsSuccess() throws IOException, JsonValidationException { + consumeConfigInputStreams(decoratedConfigPersistence); + final Map> configs = ImmutableMap.of(ConfigSchema.STANDARD_SYNC, Stream.of(STANDARD_SYNC)); + configPersistence.replaceAllConfigs(configs, false); + verify(decoratedConfigPersistence).replaceAllConfigs(any(), eq(false)); + } + + @Test + void testReplaceAllConfigsFailure() throws IOException { + consumeConfigInputStreams(decoratedConfigPersistence); + final Map> configs = ImmutableMap.of(ConfigSchema.STANDARD_SYNC, Stream.of(WORKSPACE)); + assertThrows(IllegalArgumentException.class, () -> configPersistence.replaceAllConfigs(configs, false)); + verify(decoratedConfigPersistence).replaceAllConfigs(any(), eq(false)); + } + + /** + * Consumes all streams input via replaceAllConfigs. This will trigger any exceptions that are + * thrown during processing. + * + * @param configPersistence - config persistence mock where this runs. + */ + private static void consumeConfigInputStreams(final ConfigPersistence configPersistence) throws IOException { + doAnswer((Answer) invocation -> { + final Map> argument = invocation.getArgument(0); + // force the streams to be consumed so that we can verify the exception was thrown. + argument.values().forEach(entry -> entry.collect(Collectors.toList())); + return null; + }).when(configPersistence).replaceAllConfigs(any(), eq(false)); + } + @SuppressWarnings("SameParameterValue") private static ConfigWithMetadata withMetadata(final StandardSync actorDefinition) { return new ConfigWithMetadata<>(actorDefinition.getConnectionId().toString(), diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java index 88be8200284c..e8c8df52fa59 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java @@ -7,13 +7,17 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; +import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; import io.airbyte.config.ConfigWithMetadata; import io.airbyte.config.StandardSourceDefinition; @@ -26,8 +30,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.stubbing.Answer; class ValidatingConfigPersistenceTest { @@ -205,6 +212,40 @@ void testListConfigsWithMetadataFailure() throws JsonValidationException, IOExce .listConfigsWithMetadata(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)); } + @Test + void testReplaceAllConfigsSuccess() throws IOException, JsonValidationException { + consumeConfigInputStreams(decoratedConfigPersistence); + final Map> configs = ImmutableMap.of(ConfigSchema.STANDARD_SOURCE_DEFINITION, Stream.of(SOURCE_1)); + configPersistence.replaceAllConfigs(configs, false); + verify(decoratedConfigPersistence).replaceAllConfigs(any(), eq(false)); + } + + @Test + void testReplaceAllConfigsFailure() throws IOException, JsonValidationException { + doThrow(new JsonValidationException("error")).when(schemaValidator).ensure(any(), any()); + consumeConfigInputStreams(decoratedConfigPersistence); + final Map> configs = ImmutableMap.of(ConfigSchema.STANDARD_SOURCE_DEFINITION, Stream.of(SOURCE_1)); + // because this takes place in a lambda the JsonValidationException gets rethrown as a + // RuntimeException. + assertThrows(RuntimeException.class, () -> configPersistence.replaceAllConfigs(configs, false)); + verify(decoratedConfigPersistence).replaceAllConfigs(any(), eq(false)); + } + + /** + * Consumes all streams input via replaceAllConfigs. This will trigger any exceptions that are + * thrown during processing. + * + * @param configPersistence - config persistence mock where this runs. + */ + private static void consumeConfigInputStreams(final ConfigPersistence configPersistence) throws IOException { + doAnswer((Answer) invocation -> { + final Map> argument = invocation.getArgument(0); + // force the streams to be consumed so that we can verify the exception was thrown. + argument.values().forEach(entry -> entry.collect(Collectors.toList())); + return null; + }).when(configPersistence).replaceAllConfigs(any(), eq(false)); + } + private static ConfigWithMetadata withMetadata(final StandardSourceDefinition sourceDef) { return new ConfigWithMetadata<>(sourceDef.getSourceDefinitionId().toString(), ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), From d202a9ca051903bdbc9633222c4bb6e2d3c4bc9d Mon Sep 17 00:00:00 2001 From: cgardens Date: Sun, 20 Feb 2022 12:31:37 -0800 Subject: [PATCH 3/4] remove todo --- .../io/airbyte/config/persistence/DatabaseConfigPersistence.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index f40a89e42b67..35817854982d 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -1393,7 +1393,6 @@ private void deleteStandardSync(final String configId) throws IOException { }); } - // todo (cgardens) - how to protect types here? @Override public void replaceAllConfigs(final Map> configs, final boolean dryRun) throws IOException { if (dryRun) { From d038715952148c89c9371b60887e9d59fcbd30e8 Mon Sep 17 00:00:00 2001 From: cgardens Date: Sun, 20 Feb 2022 13:16:56 -0800 Subject: [PATCH 4/4] compile fix --- .../config/persistence/DatabaseConfigPersistence.java | 9 +++++++++ .../io/airbyte/server/handlers/ArchiveHandlerTest.java | 4 ++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index 35817854982d..8026a9e8e302 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -85,6 +85,15 @@ public class DatabaseConfigPersistence implements ConfigPersistence { private final FeatureFlags featureFlags; private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseConfigPersistence.class); + /** + * Entrypoint into DatabaseConfigPersistence. Except in testing, we should never be using it without + * it being decorated with validation classes. + * + * @param database - database where configs are stored + * @param jsonSecretsProcessor - for filtering secrets in export + * @param featureFlags - feature flags that govern secret export behavior + * @return database config persistence wrapped in validation decorators + */ public static ConfigPersistence createWithValidation(final Database database, final JsonSecretsProcessor jsonSecretsProcessor, final FeatureFlags featureFlags) { diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java index 68d6b3cf361e..736c3a8e8212 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java @@ -80,7 +80,7 @@ public class ArchiveHandlerTest { private JobPersistence jobPersistence; private SecretsRepositoryReader secretsRepositoryReader; private SecretsRepositoryWriter secretsRepositoryWriter; - private DatabaseConfigPersistence configPersistence; + private ConfigPersistence configPersistence; private ConfigPersistence seedPersistence; private JsonSecretsProcessor jsonSecretsProcessor; private FeatureFlags featureFlags; @@ -122,7 +122,7 @@ public void setup() throws Exception { jsonSecretsProcessor = mock(JsonSecretsProcessor.class); featureFlags = mock(FeatureFlags.class); when(featureFlags.exposeSecretsInExport()).thenReturn(true); - configPersistence = new DatabaseConfigPersistence(jobDatabase, jsonSecretsProcessor, featureFlags); + configPersistence = DatabaseConfigPersistence.createWithValidation(jobDatabase, jsonSecretsProcessor, featureFlags); configPersistence.replaceAllConfigs(Collections.emptyMap(), false); configPersistence.loadData(seedPersistence); configRepository = new ConfigRepository(configPersistence, configDatabase);