Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 All java connectors: Added configValidator to check, discover, read and write calls #4699

Merged
merged 10 commits into from
Jul 16, 2021
2 changes: 2 additions & 0 deletions airbyte-integrations/bases/base-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencies {
implementation 'commons-cli:commons-cli:1.4'

implementation project(':airbyte-protocol:models')
implementation project(':airbyte-queue')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we add the deps to airbyte-queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm...as part of this PR I explicitly added the only implementation project(":airbyte-json-validation").
Have no idea where "airbyte-queue" comes from. Maybe as part of auto-merging masker branch... some kind of merge conflicts but I didn't see it. Shall we remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementation project(":airbyte-json-validation")

implementation files(project(':airbyte-integrations:bases:base').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Scanner;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +56,7 @@ public class IntegrationRunner {
private final Integration integration;
private final Destination destination;
private final Source source;
private static JsonSchemaValidator validator;

public IntegrationRunner(Destination destination) {
this(new IntegrationCliParser(), Destination::defaultOutputRecordCollector, destination, null);
Expand All @@ -75,6 +78,17 @@ public IntegrationRunner(Source source) {
this.integration = source != null ? source : destination;
this.source = source;
this.destination = destination;
validator = new JsonSchemaValidator();
}

@VisibleForTesting
IntegrationRunner(IntegrationCliParser cliParser,
Consumer<AirbyteMessage> outputRecordCollector,
Destination destination,
Source source,
JsonSchemaValidator jsonSchemaValidator) {
this(cliParser, outputRecordCollector, destination, source);
this.validator = jsonSchemaValidator;
}

public void run(String[] args) throws Exception {
Expand All @@ -90,18 +104,20 @@ public void run(String[] args) throws Exception {
case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec()));
case CHECK -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config)));
}
// source only
case DISCOVER -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER");
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config)));
}
// todo (cgardens) - it is incongruous that that read and write return airbyte message (the
// envelope) while the other commands return what goes inside it.
case READ -> {

final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "READ");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
Expand All @@ -112,6 +128,7 @@ public void run(String[] args) throws Exception {
// destination only
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector);
consumeWriteStream(consumer);
Expand Down Expand Up @@ -142,6 +159,14 @@ static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception
}
}

private static void validateConfig(JsonNode schemaJson, JsonNode objectJson, String operationType) throws Exception {
final Set<String> validationResult = validator.validate(schemaJson, objectJson);
if (!validationResult.isEmpty()) {
throw new Exception(String.format("Verification error(s) occurred for %s. Errors: %s ",
operationType, validationResult.toString()));
}
}

private static JsonNode parseConfig(Path path) {
return Jsons.deserialize(IOs.readFile(path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.base;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand All @@ -50,6 +51,7 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -138,10 +140,15 @@ void testCheckSource() throws Exception {
when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.check(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);
JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
Expand All @@ -152,44 +159,64 @@ void testCheckDestination() throws Exception {
when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.check(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, destination, null).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(destination.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);

new IntegrationRunner(cliParser, stdoutConsumer, destination, null, jsonSchemaValidator).run(ARGS);

verify(destination).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testDiscover() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.discover(configPath);
final AirbyteCatalog output = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName("oceans")));
final AirbyteCatalog output = new AirbyteCatalog()
.withStreams(Lists.newArrayList(new AirbyteStream().withName("oceans")));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.discover(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).discover(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testRead() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath, statePath);
final AirbyteMessage message1 = new AirbyteMessage()
.withType(Type.RECORD)
final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath,
statePath);
final AirbyteMessage message1 = new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withData(Jsons.jsonNode(ImmutableMap.of("names", "byron"))));
final AirbyteMessage message2 = new AirbyteMessage()
.withType(Type.RECORD).withRecord(new AirbyteRecordMessage()
.withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald"))));
final AirbyteMessage message2 = new AirbyteMessage().withType(Type.RECORD).withRecord(new AirbyteRecordMessage()
.withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald"))));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenReturn(AutoCloseableIterators.fromIterator(MoreIterators.of(message1, message2)));
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE))
.thenReturn(AutoCloseableIterators.fromIterator(MoreIterators.of(message1, message2)));

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE);
verify(stdoutConsumer).accept(message1);
verify(stdoutConsumer).accept(message2);
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
Expand All @@ -199,10 +226,17 @@ void testWrite() throws Exception {
when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer)).thenReturn(airbyteMessageConsumerMock);

final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination, null));
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(destination.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);

final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination, null, jsonSchemaValidator));
runner.run(ARGS);

verify(destination).getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer);
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
Expand Down