diff --git a/airbyte-integrations/bases/base-java/build.gradle b/airbyte-integrations/bases/base-java/build.gradle index bf729154a0cd..5cb12cd12113 100644 --- a/airbyte-integrations/bases/base-java/build.gradle +++ b/airbyte-integrations/bases/base-java/build.gradle @@ -7,6 +7,8 @@ dependencies { implementation 'commons-cli:commons-cli:1.4' implementation project(':airbyte-protocol:models') + implementation project(':airbyte-queue') + implementation project(":airbyte-json-validation") implementation files(project(':airbyte-integrations:bases:base').airbyteDocker.outputs) } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 4e9ad5ff9ec5..8a955758ca51 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -33,9 +33,11 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.validation.json.JsonSchemaValidator; import java.nio.file.Path; import java.util.Optional; import java.util.Scanner; +import java.util.Set; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +56,7 @@ public class IntegrationRunner { private final Integration integration; private final Destination destination; private final Source source; + private static JsonSchemaValidator validator; public IntegrationRunner(Destination destination) { this(new IntegrationCliParser(), Destination::defaultOutputRecordCollector, destination, null); @@ -75,6 +78,17 @@ public IntegrationRunner(Source source) { this.integration = source != null ? source : destination; this.source = source; this.destination = destination; + validator = new JsonSchemaValidator(); + } + + @VisibleForTesting + IntegrationRunner(IntegrationCliParser cliParser, + Consumer outputRecordCollector, + Destination destination, + Source source, + JsonSchemaValidator jsonSchemaValidator) { + this(cliParser, outputRecordCollector, destination, source); + this.validator = jsonSchemaValidator; } public void run(String[] args) throws Exception { @@ -90,18 +104,20 @@ public void run(String[] args) throws Exception { case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec())); case CHECK -> { final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK"); outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config))); } // source only case DISCOVER -> { final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER"); outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config))); } // todo (cgardens) - it is incongruous that that read and write return airbyte message (the // envelope) while the other commands return what goes inside it. case READ -> { - final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "READ"); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null)); @@ -112,6 +128,7 @@ public void run(String[] args) throws Exception { // destination only case WRITE -> { final JsonNode config = parseConfig(parsed.getConfigPath()); + validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector); consumeWriteStream(consumer); @@ -142,6 +159,14 @@ static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception } } + private static void validateConfig(JsonNode schemaJson, JsonNode objectJson, String operationType) throws Exception { + final Set validationResult = validator.validate(schemaJson, objectJson); + if (!validationResult.isEmpty()) { + throw new Exception(String.format("Verification error(s) occurred for %s. Errors: %s ", + operationType, validationResult.toString())); + } + } + private static JsonNode parseConfig(Path path) { return Jsons.deserialize(IOs.readFile(path)); } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index d8e5c9048e01..641e1ef855aa 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -25,6 +25,7 @@ package io.airbyte.integrations.base; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -50,6 +51,7 @@ import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.validation.json.JsonSchemaValidator; import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.URI; @@ -138,10 +140,15 @@ void testCheckSource() throws Exception { when(cliParser.parse(ARGS)).thenReturn(intConfig); when(source.check(CONFIG)).thenReturn(output); - new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS); + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(source.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS); verify(source).check(CONFIG); verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output)); + verify(jsonSchemaValidator).validate(any(), any()); } @Test @@ -152,44 +159,64 @@ void testCheckDestination() throws Exception { when(cliParser.parse(ARGS)).thenReturn(intConfig); when(destination.check(CONFIG)).thenReturn(output); - new IntegrationRunner(cliParser, stdoutConsumer, destination, null).run(ARGS); + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(destination.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + + JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + + new IntegrationRunner(cliParser, stdoutConsumer, destination, null, jsonSchemaValidator).run(ARGS); verify(destination).check(CONFIG); verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output)); + verify(jsonSchemaValidator).validate(any(), any()); } @Test void testDiscover() throws Exception { final IntegrationConfig intConfig = IntegrationConfig.discover(configPath); - final AirbyteCatalog output = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName("oceans"))); + final AirbyteCatalog output = new AirbyteCatalog() + .withStreams(Lists.newArrayList(new AirbyteStream().withName("oceans"))); when(cliParser.parse(ARGS)).thenReturn(intConfig); when(source.discover(CONFIG)).thenReturn(output); - new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS); + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(source.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + + JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS); verify(source).discover(CONFIG); verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(output)); + verify(jsonSchemaValidator).validate(any(), any()); } @Test void testRead() throws Exception { - final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath, statePath); - final AirbyteMessage message1 = new AirbyteMessage() - .withType(Type.RECORD) + final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath, + statePath); + final AirbyteMessage message1 = new AirbyteMessage().withType(Type.RECORD) .withRecord(new AirbyteRecordMessage().withData(Jsons.jsonNode(ImmutableMap.of("names", "byron")))); - final AirbyteMessage message2 = new AirbyteMessage() - .withType(Type.RECORD).withRecord(new AirbyteRecordMessage() - .withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald")))); + final AirbyteMessage message2 = new AirbyteMessage().withType(Type.RECORD).withRecord(new AirbyteRecordMessage() + .withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald")))); when(cliParser.parse(ARGS)).thenReturn(intConfig); - when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenReturn(AutoCloseableIterators.fromIterator(MoreIterators.of(message1, message2))); + when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)) + .thenReturn(AutoCloseableIterators.fromIterator(MoreIterators.of(message1, message2))); - new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS); + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(source.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + + JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS); verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE); verify(stdoutConsumer).accept(message1); verify(stdoutConsumer).accept(message2); + verify(jsonSchemaValidator).validate(any(), any()); } @Test @@ -199,10 +226,17 @@ void testWrite() throws Exception { when(cliParser.parse(ARGS)).thenReturn(intConfig); when(destination.getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer)).thenReturn(airbyteMessageConsumerMock); - final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination, null)); + final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class); + when(destination.spec()).thenReturn(expectedConnSpec); + when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG); + + JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class); + + final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination, null, jsonSchemaValidator)); runner.run(ARGS); verify(destination).getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer); + verify(jsonSchemaValidator).validate(any(), any()); } @Test