Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 All java connectors: Added configValidator to check, discover, read and write calls #4699

Merged
merged 10 commits into from
Jul 16, 2021
2 changes: 2 additions & 0 deletions airbyte-integrations/bases/base-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencies {
implementation 'commons-cli:commons-cli:1.4'

implementation project(':airbyte-protocol:models')
implementation project(':airbyte-queue')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we add the deps to airbyte-queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm...as part of this PR I explicitly added the only implementation project(":airbyte-json-validation").
Have no idea where "airbyte-queue" comes from. Maybe as part of auto-merging masker branch... some kind of merge conflicts but I didn't see it. Shall we remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementation project(":airbyte-json-validation")

implementation files(project(':airbyte-integrations:bases:base').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Scanner;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +56,7 @@ public class IntegrationRunner {
private final Integration integration;
private final Destination destination;
private final Source source;
private static JsonSchemaValidator validator;

public IntegrationRunner(Destination destination) {
this(new IntegrationCliParser(), Destination::defaultOutputRecordCollector, destination, null);
Expand All @@ -68,13 +71,26 @@ public IntegrationRunner(Source source) {
Consumer<AirbyteMessage> outputRecordCollector,
Destination destination,
Source source) {
Preconditions.checkState(destination != null ^ source != null, "can only pass in a destination or a source");
Preconditions.checkState(destination != null ^ source != null,
"can only pass in a destination or a source");
this.cliParser = cliParser;
this.outputRecordCollector = outputRecordCollector;
// integration iface covers the commands that are the same for both source and destination.
this.integration = source != null ? source : destination;
this.source = source;
this.destination = destination;
validator = new JsonSchemaValidator();
}

@VisibleForTesting
IntegrationRunner(IntegrationCliParser cliParser,
Consumer<AirbyteMessage> outputRecordCollector,
Destination destination,
Source source,
JsonSchemaValidator jsonSchemaValidator) {

etsybaev marked this conversation as resolved.
Show resolved Hide resolved
this(cliParser, outputRecordCollector, destination, source);
this.validator = jsonSchemaValidator;
}

public void run(String[] args) throws Exception {
Expand All @@ -87,33 +103,45 @@ public void run(String[] args) throws Exception {

switch (parsed.getCommand()) {
// common
case SPEC -> outputRecordCollector.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec()));
case SPEC -> outputRecordCollector
.accept(new AirbyteMessage().withType(Type.SPEC).withSpec(integration.spec()));
case CHECK -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(integration.check(config)));
validateConfig(integration.spec().getConnectionSpecification(), config, "CHECK");
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS)
.withConnectionStatus(integration.check(config)));
}
// source only
case DISCOVER -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
outputRecordCollector.accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config)));
validateConfig(integration.spec().getConnectionSpecification(), config, "DISCOVER");
outputRecordCollector.accept(
new AirbyteMessage().withType(Type.CATALOG).withCatalog(source.discover(config)));
}
// todo (cgardens) - it is incongruous that that read and write return airbyte message (the
// envelope) while the other commands return what goes inside it.
case READ -> {

final JsonNode config = parseConfig(parsed.getConfigPath());
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
validateConfig(integration.spec().getConnectionSpecification(), config, "READ");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(),
ConfiguredAirbyteCatalog.class);
final Optional<JsonNode> stateOptional = parsed.getStatePath()
.map(IntegrationRunner::parseConfig);
final AutoCloseableIterator<AirbyteMessage> messageIterator = source
.read(config, catalog, stateOptional.orElse(null));
try (messageIterator) {
messageIterator.forEachRemaining(outputRecordCollector::accept);
}
}
// destination only
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector);
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(),
ConfiguredAirbyteCatalog.class);
final AirbyteMessageConsumer consumer = destination
.getConsumer(config, catalog, outputRecordCollector);
consumeWriteStream(consumer);
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
Expand All @@ -131,7 +159,8 @@ static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception
consumer.start();
while (input.hasNext()) {
final String inputString = input.next();
final Optional<AirbyteMessage> singerMessageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
final Optional<AirbyteMessage> singerMessageOptional = Jsons
.tryDeserialize(inputString, AirbyteMessage.class);
if (singerMessageOptional.isPresent()) {
consumer.accept(singerMessageOptional.get());
} else {
Expand All @@ -142,6 +171,14 @@ static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception
}
}

private static void validateConfig(JsonNode schemaJson, JsonNode objectJson, String operationType) throws Exception {
final Set<String> validationResult = validator.validate(schemaJson, objectJson);
if (!validationResult.isEmpty()) {
throw new Exception(String.format("Verification error(s) occurred for %s. Errors: %s ",
operationType, validationResult.toString()));
}
}

private static JsonNode parseConfig(Path path) {
return Jsons.deserialize(IOs.readFile(path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.base;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand All @@ -50,6 +51,7 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
Expand All @@ -76,8 +78,10 @@ class IntegrationRunnerTest {
private static final Long EMITTED_AT = Instant.now().toEpochMilli();
private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests");

private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName(STREAM_NAME)));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG);
private static final AirbyteCatalog CATALOG = new AirbyteCatalog()
.withStreams(Lists.newArrayList(new AirbyteStream().withName(STREAM_NAME)));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers
.toDefaultConfiguredCatalog(CATALOG);
private static final JsonNode STATE = Jsons.jsonNode(ImmutableMap.of("checkpoint", "05/08/1945"));

private IntegrationCliParser cliParser;
Expand All @@ -98,14 +102,16 @@ void setup() throws IOException {
Path configDir = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test");

configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING);
configuredCatalogPath = IOs.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG));
configuredCatalogPath = IOs
.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG));
statePath = IOs.writeFile(configDir, STATE_FILE_NAME, Jsons.serialize(STATE));
}

@Test
void testSpecSource() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.spec();
final ConnectorSpecification output = new ConnectorSpecification().withDocumentationUrl(new URI("https://docs.airbyte.io/"));
final ConnectorSpecification output = new ConnectorSpecification()
.withDocumentationUrl(new URI("https://docs.airbyte.io/"));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.spec()).thenReturn(output);
Expand All @@ -119,7 +125,8 @@ void testSpecSource() throws Exception {
@Test
void testSpecDestination() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.spec();
final ConnectorSpecification output = new ConnectorSpecification().withDocumentationUrl(new URI("https://docs.airbyte.io/"));
final ConnectorSpecification output = new ConnectorSpecification()
.withDocumentationUrl(new URI("https://docs.airbyte.io/"));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.spec()).thenReturn(output);
Expand All @@ -133,76 +140,121 @@ void testSpecDestination() throws Exception {
@Test
void testCheckSource() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.check(configPath);
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("it failed");
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED)
.withMessage("it failed");

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.check(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);
JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source,
jsonSchemaValidator)
.run(ARGS);

verify(source).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(stdoutConsumer)
.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testCheckDestination() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.check(configPath);
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("it failed");
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED)
.withMessage("it failed");

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.check(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, destination, null).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(destination.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);

new IntegrationRunner(cliParser, stdoutConsumer, destination, null, jsonSchemaValidator)
.run(ARGS);

verify(destination).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(stdoutConsumer)
.accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testDiscover() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.discover(configPath);
final AirbyteCatalog output = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName("oceans")));
final AirbyteCatalog output = new AirbyteCatalog()
.withStreams(Lists.newArrayList(new AirbyteStream().withName("oceans")));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.discover(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).discover(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CATALOG).withCatalog(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testRead() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath, statePath);
final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath,
statePath);
final AirbyteMessage message1 = new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withData(Jsons.jsonNode(ImmutableMap.of("names", "byron"))));
.withRecord(new AirbyteRecordMessage().withData(Jsons.jsonNode(ImmutableMap.of("names",
"byron"))));
final AirbyteMessage message2 = new AirbyteMessage()
.withType(Type.RECORD).withRecord(new AirbyteRecordMessage()
.withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald"))));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenReturn(AutoCloseableIterators.fromIterator(MoreIterators.of(message1, message2)));
when(source.read(CONFIG, CONFIGURED_CATALOG,
STATE))
.thenReturn(AutoCloseableIterators.fromIterator(MoreIterators.of(message1, message2)));

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);
final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE);
verify(stdoutConsumer).accept(message1);
verify(stdoutConsumer).accept(message2);
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testWrite() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.write(configPath, configuredCatalogPath);
final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class);
when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer)).thenReturn(airbyteMessageConsumerMock);
when(destination.getConsumer(CONFIG, CONFIGURED_CATALOG,
stdoutConsumer)).thenReturn(airbyteMessageConsumerMock);

final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(destination.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);

final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination, null));
final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer,
destination, null, jsonSchemaValidator));
runner.run(ARGS);

verify(destination).getConsumer(CONFIG, CONFIGURED_CATALOG, stdoutConsumer);
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
Expand Down Expand Up @@ -251,12 +303,14 @@ void testDestinationConsumerLifecycleFailure() throws Exception {
.withData(Jsons.deserialize("{ \"color\": \"yellow\" }"))
.withStream(STREAM_NAME)
.withEmittedAt(EMITTED_AT));
System.setIn(new ByteArrayInputStream((Jsons.serialize(singerMessage1) + "\n" + Jsons.serialize(singerMessage2)).getBytes()));
System.setIn(new ByteArrayInputStream(
(Jsons.serialize(singerMessage1) + "\n" + Jsons.serialize(singerMessage2)).getBytes()));

final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class);
doThrow(new IOException("error")).when(airbyteMessageConsumerMock).accept(singerMessage1);

assertThrows(IOException.class, () -> IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock));
assertThrows(IOException.class,
() -> IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock));

InOrder inOrder = inOrder(airbyteMessageConsumerMock);
inOrder.verify(airbyteMessageConsumerMock).accept(singerMessage1);
Expand Down