Skip to content

Commit

Permalink
🎉 All java connectors: Added configValidator to check, discover, read…
Browse files Browse the repository at this point in the history
… and write calls (#4699)

* Added configValidator to java connectors
  • Loading branch information
etsybaev authored Jul 16, 2021
1 parent 605c06b commit 7f4315f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 14 deletions.
2 changes: 2 additions & 0 deletions airbyte-integrations/bases/base-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencies {
implementation 'commons-cli:commons-cli:1.4'

implementation project(':airbyte-protocol:models')
implementation project(':airbyte-queue')
implementation project(":airbyte-json-validation")

implementation files(project(':airbyte-integrations:bases:base').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Scanner;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +56,7 @@ public class IntegrationRunner {
private final Integration integration;
private final Destination destination;
private final Source source;
private static JsonSchemaValidator validator;

public IntegrationRunner(Destination destination) {
this(new IntegrationCliParser(), Destination::defaultOutputRecordCollector, destination, null);
Expand All @@ -75,6 +78,17 @@ public IntegrationRunner(Source source) {
this.integration = source != null ? source : destination;
this.source = source;
this.destination = destination;
validator = new JsonSchemaValidator();
}

@VisibleForTesting
IntegrationRunner(IntegrationCliParser cliParser,
Consumer<AirbyteMessage> outputRecordCollector,
Destination destination,
Source source,
JsonSchemaValidator jsonSchemaValidator) {
this(cliParser, outputRecordCollector, destination, source);
this.validator = jsonSchemaValidator;
}

public void run(String[] args) throws Exception {
Expand All @@ -90,18 +104,20 @@ public void run(String[] args) throws Exception {
case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec()));
case CHECK -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config)));
}
// source only
case DISCOVER -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER");
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config)));
}
// todo (cgardens) - it is incongruous that that read and write return airbyte message (the
// envelope) while the other commands return what goes inside it.
case READ -> {

final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "READ");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
Expand All @@ -112,6 +128,7 @@ public void run(String[] args) throws Exception {
// destination only
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector);
consumeWriteStream(consumer);
Expand Down Expand Up @@ -142,6 +159,14 @@ static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception
}
}

private static void validateConfig(JsonNode schemaJson, JsonNode objectJson, String operationType) throws Exception {
final Set<String> validationResult = validator.validate(schemaJson, objectJson);
if (!validationResult.isEmpty()) {
throw new Exception(String.format("Verification error(s) occurred for %s. Errors: %s ",
operationType, validationResult.toString()));
}
}

private static JsonNode parseConfig(Path path) {
return Jsons.deserialize(IOs.readFile(path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.base;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand All @@ -50,6 +51,7 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -138,10 +140,15 @@ void testCheckSource() throws Exception {
when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.check(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);
JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
Expand All @@ -152,44 +159,64 @@ void testCheckDestination() throws Exception {
when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.check(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, destination, null).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(destination.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);

new IntegrationRunner(cliParser, stdoutConsumer, destination, null, jsonSchemaValidator).run(ARGS);

verify(destination).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testDiscover() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.discover(configPath);
final AirbyteCatalog output = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName("oceans")));
final AirbyteCatalog output = new AirbyteCatalog()
.withStreams(Lists.newArrayList(new AirbyteStream().withName("oceans")));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.discover(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).discover(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testRead() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath, statePath);
final AirbyteMessage message1 = new AirbyteMessage()
.withType(Type.RECORD)
final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath,
statePath);
final AirbyteMessage message1 = new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withData(Jsons.jsonNode(ImmutableMap.of("names", "byron"))));
final AirbyteMessage message2 = new AirbyteMessage()
.withType(Type.RECORD).withRecord(new AirbyteRecordMessage()
.withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald"))));
final AirbyteMessage message2 = new AirbyteMessage().withType(Type.RECORD).withRecord(new AirbyteRecordMessage()
.withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald"))));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenReturn(AutoCloseableIterators.fromIterator(MoreIterators.of(message1, message2)));
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE))
.thenReturn(AutoCloseableIterators.fromIterator(MoreIterators.of(message1, message2)));

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE);
verify(stdoutConsumer).accept(message1);
verify(stdoutConsumer).accept(message2);
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
Expand All @@ -199,10 +226,17 @@ void testWrite() throws Exception {
when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer)).thenReturn(airbyteMessageConsumerMock);

final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination, null));
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(destination.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);

final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination, null, jsonSchemaValidator));
runner.run(ARGS);

verify(destination).getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer);
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
Expand Down

0 comments on commit 7f4315f

Please sign in to comment.