diff --git a/airbyte-config/init/src/main/resources/icons/airbyte.svg b/airbyte-config/init/src/main/resources/icons/airbyte.svg new file mode 100644 index 000000000000..c19d229cbc90 --- /dev/null +++ b/airbyte-config/init/src/main/resources/icons/airbyte.svg @@ -0,0 +1,3 @@ + + + diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 4705e5eb53e8..860fe566eb69 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -45,6 +45,12 @@ dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/dynamodb icon: dynamodb.svg +- name: E2E Testing + destinationDefinitionId: 2eb65e87-983a-4fd7-b3e3-9d9dc6eb8537 + dockerRepository: airbyte/destination-e2e-test + dockerImageTag: 0.2.0 + documentationUrl: https://docs.airbyte.io/integrations/destinations/e2e-test + icon: airbyte.svg - destinationDefinitionId: 68f351a7-2745-4bef-ad7f-996b8e51bb8c name: ElasticSearch dockerRepository: airbyte/destination-elasticsearch diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index bc92ecd78d71..51e347b269fa 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -769,6 +769,165 @@ supported_destination_sync_modes: - "overwrite" - "append" +- dockerImage: "airbyte/destination-e2e-test:0.2.0" + spec: + documentationUrl: "https://example.com" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "E2E Test Destination Spec" + type: "object" + oneOf: + - title: "Logging" + required: + - "type" + - "logging_config" + properties: + type: + type: "string" + const: "LOGGING" + default: "LOGGING" + logging_config: + title: "Logging Configuration" + type: "object" + description: "Configurate how the messages are logged." + oneOf: + - title: "First N Entries" + description: "Log first N entries per stream." + type: "object" + required: + - "logging_type" + - "max_entry_count" + properties: + logging_type: + type: "string" + enum: + - "FirstN" + default: "FirstN" + max_entry_count: + title: "N" + description: "Number of entries to log. This destination is for\ + \ testing only. So it won't make sense to log infinitely. The\ + \ maximum is 1,000 entries." + type: "number" + default: 100 + examples: + - 100 + minimum: 1 + maximum: 1000 + - title: "Every N-th Entry" + description: "For each stream, log every N-th entry with a maximum cap." + type: "object" + required: + - "logging_type" + - "nth_entry_to_log" + - "max_entry_count" + properties: + logging_type: + type: "string" + enum: + - "EveryNth" + default: "EveryNth" + nth_entry_to_log: + title: "N" + description: "The N-th entry to log for each stream. N starts from\ + \ 1. For example, when N = 1, every entry is logged; when N =\ + \ 2, every other entry is logged; when N = 3, one out of three\ + \ entries is logged." + type: "number" + example: + - 3 + minimum: 1 + maximum: 1000 + max_entry_count: + title: "Max Log Entries" + description: "Max number of entries to log. This destination is\ + \ for testing only. So it won't make sense to log infinitely.\ + \ The maximum is 1,000 entries." + type: "number" + default: 100 + examples: + - 100 + minimum: 1 + maximum: 1000 + - title: "Random Sampling" + description: "For each stream, randomly log a percentage of the entries\ + \ with a maximum cap." + type: "object" + required: + - "logging_type" + - "sampling_ratio" + - "max_entry_count" + properties: + logging_type: + type: "string" + enum: + - "RandomSampling" + default: "RandomSampling" + sampling_ratio: + title: "Sampling Ratio" + description: "A positive floating number smaller than 1." + type: "number" + default: 0.001 + examples: + - 0.001 + minimum: 0 + maximum: 1 + seed: + title: "Random Number Generator Seed" + description: "When the seed is unspecified, the current time millis\ + \ will be used as the seed." + type: "number" + examples: + - 1900 + max_entry_count: + title: "Max Log Entries" + description: "Max number of entries to log. This destination is\ + \ for testing only. So it won't make sense to log infinitely.\ + \ The maximum is 1,000 entries." + type: "number" + default: 100 + examples: + - 100 + minimum: 1 + maximum: 1000 + - title: "Silent" + required: + - "type" + properties: + type: + type: "string" + const: "SILENT" + default: "SILENT" + - title: "Throttled" + required: + - "type" + - "millis_per_record" + properties: + type: + type: "string" + const: "THROTTLED" + default: "THROTTLED" + millis_per_record: + description: "Number of milli-second to pause in between records." + type: "integer" + - title: "Failing" + required: + - "type" + - "num_messages" + properties: + type: + type: "string" + const: "FAILING" + default: "FAILING" + num_messages: + description: "Number of messages after which to fail." + type: "integer" + supportsIncremental: true + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: + - "overwrite" + - "append" - dockerImage: "airbyte/destination-elasticsearch:0.1.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/elasticsearch" diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index e204cf7fb5ac..3ae161b2c5d3 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -112,7 +112,9 @@ | ClickHouse | [![destination-clickhouse](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-clickhouse%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-clickhouse) | | Cassandra | [![destination-cassandra](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-cassandra%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-cassandra) | | Databricks | (Temporarily Not Available) | +| Dev Null | [![destination-dev-null](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-dev-null%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-dev-null) | | Elasticsearch | (Temporarily Not Available) | +| End-to-End Testing | [![destination-e2e-test](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-e2e-test%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-e2e-test) | | Google Cloud Storage (GCS) | [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-gcs%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-gcs) | | Google Firestore | [![destination-firestore](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-firestore%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-firestore) | | Google PubSub | [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-pubsub) | diff --git a/airbyte-integrations/connectors/destination-dev-null/.dockerignore b/airbyte-integrations/connectors/destination-dev-null/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-dev-null/Dockerfile b/airbyte-integrations/connectors/destination-dev-null/Dockerfile new file mode 100644 index 000000000000..e66cddd27b4d --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/Dockerfile @@ -0,0 +1,10 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-dev-null + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-dev-null diff --git a/airbyte-integrations/connectors/destination-dev-null/README.md b/airbyte-integrations/connectors/destination-dev-null/README.md new file mode 100644 index 000000000000..bfbf20a9cf2c --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/README.md @@ -0,0 +1,51 @@ +# Destination Dev Null + +This destination is a "safe" version of the [E2E Test destination](https://docs.airbyte.io/integrations/destinations/e2e-test). It only allows the "silent" mode. + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-dev-null:build +``` + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-dev-null:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-dev-null:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-dev-null:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-dev-null:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-dev-null:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-dev-null:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-dev-null:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-dev-null/build.gradle b/airbyte-integrations/connectors/destination-dev-null/build.gradle new file mode 100644 index 000000000000..9e8e9eaf9e44 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/build.gradle @@ -0,0 +1,20 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.dev_null.DevNullDestination' +} + +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:connectors:destination-e2e-test') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-dev-null') +} diff --git a/airbyte-integrations/connectors/destination-dev-null/src/main/java/io/airbyte/integrations/destination/dev_null/DevNullDestination.java b/airbyte-integrations/connectors/destination-dev-null/src/main/java/io/airbyte/integrations/destination/dev_null/DevNullDestination.java new file mode 100644 index 000000000000..17423cdd6399 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/src/main/java/io/airbyte/integrations/destination/dev_null/DevNullDestination.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.dev_null; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination; +import io.airbyte.integrations.destination.e2e_test.TestingDestinations; +import io.airbyte.protocol.models.ConnectorSpecification; +import java.util.Iterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DevNullDestination extends SpecModifyingDestination implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(DevNullDestination.class); + private static final String DEV_NULL_DESTINATION_TITLE = "Dev Null Destination Spec"; + + public DevNullDestination() { + super(new TestingDestinations()); + } + + public static void main(final String[] args) throws Exception { + LOGGER.info("Starting destination: {}", DevNullDestination.class); + new IntegrationRunner(new DevNullDestination()).run(args); + LOGGER.info("Completed destination: {}", DevNullDestination.class); + } + + /** + * 1. Update the title. 2. Only keep the "silent" mode. + */ + @Override + public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) { + final ConnectorSpecification spec = Jsons.clone(originalSpec); + + ((ObjectNode) spec.getConnectionSpecification()).put("title", DEV_NULL_DESTINATION_TITLE); + + final ArrayNode types = (ArrayNode) spec.getConnectionSpecification().get("oneOf"); + final Iterator typesIterator = types.elements(); + while (typesIterator.hasNext()) { + final JsonNode typeNode = typesIterator.next(); + if (!typeNode.get("properties").get("type").get("const").asText().equalsIgnoreCase("silent")) { + typesIterator.remove(); + } + } + return spec; + } + +} diff --git a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/java/io/airbyte/integrations/destination/dev_null/DevNullDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/java/io/airbyte/integrations/destination/dev_null/DevNullDestinationAcceptanceTest.java new file mode 100644 index 000000000000..2946b692f022 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/java/io/airbyte/integrations/destination/dev_null/DevNullDestinationAcceptanceTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.dev_null; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.Collections; +import java.util.List; + +public class DevNullDestinationAcceptanceTest extends DestinationAcceptanceTest { + + @Override + protected String getImageName() { + return "airbyte/destination-dev-null:dev"; + } + + @Override + protected JsonNode getConfig() { + return Jsons.jsonNode(Collections.singletonMap("type", "SILENT")); + } + + @Override + protected JsonNode getFailCheckConfig() { + return Jsons.jsonNode(Collections.singletonMap("type", "invalid")); + } + + @Override + protected List retrieveRecords(final TestDestinationEnv testEnv, + final String streamName, + final String namespace, + final JsonNode streamSchema) { + return Collections.emptyList(); + } + + @Override + protected void setup(final TestDestinationEnv testEnv) { + // do nothing + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + // do nothing + } + + @Override + protected void assertSameMessages(final List expected, + final List actual, + final boolean pruneAirbyteInternalFields) { + assertEquals(0, actual.size()); + } + +} diff --git a/airbyte-integrations/connectors/destination-dev-null/src/test/java/io/airbyte/integrations/destination/dev_null/DevNullDestinationTest.java b/airbyte-integrations/connectors/destination-dev-null/src/test/java/io/airbyte/integrations/destination/dev_null/DevNullDestinationTest.java new file mode 100644 index 000000000000..eeade0389f25 --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/src/test/java/io/airbyte/integrations/destination/dev_null/DevNullDestinationTest.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.dev_null; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.ConnectorSpecification; +import org.junit.jupiter.api.Test; + +class DevNullDestinationTest { + + @Test + public void testSpec() throws Exception { + final ConnectorSpecification actual = new DevNullDestination().spec(); + final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class); + + assertEquals(expected, actual); + } + +} diff --git a/airbyte-integrations/connectors/destination-dev-null/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/destination-dev-null/src/test/resources/expected_spec.json new file mode 100644 index 000000000000..f162bc42884a --- /dev/null +++ b/airbyte-integrations/connectors/destination-dev-null/src/test/resources/expected_spec.json @@ -0,0 +1,25 @@ +{ + "documentationUrl": "https://example.com", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Dev Null Destination Spec", + "type": "object", + "oneOf": [ + { + "title": "Silent", + "required": ["type"], + "properties": { + "type": { + "type": "string", + "const": "SILENT", + "default": "SILENT" + } + } + } + ] + } +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/Dockerfile b/airbyte-integrations/connectors/destination-e2e-test/Dockerfile index b0b99f1f74ca..4c0f6c5a50b9 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/Dockerfile +++ b/airbyte-integrations/connectors/destination-e2e-test/Dockerfile @@ -4,7 +4,8 @@ WORKDIR /airbyte ENV APPLICATION destination-e2e-test -ADD build/distributions/${APPLICATION}*.tar /airbyte +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar +RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.2.0 LABEL io.airbyte.name=airbyte/destination-e2e-test diff --git a/airbyte-integrations/connectors/destination-e2e-test/README.md b/airbyte-integrations/connectors/destination-e2e-test/README.md new file mode 100644 index 000000000000..e8f2bee4577a --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/README.md @@ -0,0 +1,70 @@ +# End-to-End Testing Destination + +This is the repository for the Null destination connector in Java. For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/e2e-test). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-e2e-test:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-e2e-test:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-e2e-test:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-e2e-test:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-e2e-test:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +#### Dev Null Destination +The Dev Null Destination depends on this connector. It only allows the "silent" mode. When this mode is changed, please make sure that the Dev Null Destination is updated and published accordingly as well. + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/destinations/e2e-test`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. See example(s) in +`src/test-integration/java/io/airbyte/integrations/destinations/e2e-test/`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-e2e-test:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-e2e-test:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +2. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +3. Create a Pull Request. +4. Pat yourself on the back for being an awesome contributor. +5. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-e2e-test/build.gradle b/airbyte-integrations/connectors/destination-e2e-test/build.gradle index fbebb2dec265..4ff3a27be3d8 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/build.gradle +++ b/airbyte-integrations/connectors/destination-e2e-test/build.gradle @@ -9,19 +9,11 @@ application { } dependencies { - implementation project(':airbyte-db:lib') - implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-config:models') implementation project(':airbyte-protocol:models') - implementation project(':airbyte-integrations:connectors:destination-jdbc') - - testImplementation project(':airbyte-test-utils') - - testImplementation "org.testcontainers:postgresql:1.15.3" + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') - - integrationTestJavaImplementation "org.testcontainers:postgresql:1.15.3" - - implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) - integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs) + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-e2e-test') } diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java index 4a4a577b5eba..59a73b68e5b8 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/FailAfterNDestination.java @@ -43,23 +43,22 @@ public FailAfterNConsumer(final long numMessagesAfterWhichToFail, final Consumer this.numMessagesAfterWhichToFail = numMessagesAfterWhichToFail; this.outputRecordCollector = outputRecordCollector; this.numMessagesSoFar = 0; + LOGGER.info("Will fail after {} messages", numMessagesAfterWhichToFail); } @Override public void start() {} @Override - public void accept(final AirbyteMessage message) throws Exception { - LOGGER.info("received record: {}", message); + public void accept(final AirbyteMessage message) { numMessagesSoFar += 1; - LOGGER.info("received {} messages so far", numMessagesSoFar); if (numMessagesSoFar > numMessagesAfterWhichToFail) { throw new IllegalStateException("Forcing a fail after processing " + numMessagesAfterWhichToFail + " messages."); } if (message.getType() == Type.STATE) { - LOGGER.info("emitting state: {}", message); + LOGGER.info("Emitting state: {}", message); outputRecordCollector.accept(message); } } diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/LoggingDestination.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/LoggingDestination.java index 0d456aa6b5c0..171f73ed8dd1 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/LoggingDestination.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/LoggingDestination.java @@ -8,10 +8,11 @@ import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.destination.e2e_test.logging.LoggingConsumer; +import io.airbyte.integrations.destination.e2e_test.logging.TestingLoggerFactory; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.function.Consumer; import org.slf4j.Logger; @@ -33,33 +34,7 @@ public AirbyteConnectionStatus check(final JsonNode config) { public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { - return new RecordConsumer(outputRecordCollector); - } - - public static class RecordConsumer implements AirbyteMessageConsumer { - - private final Consumer outputRecordCollector; - - public RecordConsumer(final Consumer outputRecordCollector) { - this.outputRecordCollector = outputRecordCollector; - } - - @Override - public void start() {} - - @Override - public void accept(final AirbyteMessage message) { - LOGGER.info("record: {}", message); - - if (message.getType() == Type.STATE) { - LOGGER.info("emitting state: {}", message); - outputRecordCollector.accept(message); - } - } - - @Override - public void close() {} - + return new LoggingConsumer(new TestingLoggerFactory(config), catalog, outputRecordCollector); } } diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/SilentDestination.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/SilentDestination.java index f53108d98f64..aec6c3b1e680 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/SilentDestination.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/SilentDestination.java @@ -13,19 +13,13 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This destination silently receives records. */ public class SilentDestination extends BaseConnector implements Destination { - private static final Logger LOGGER = LoggerFactory.getLogger(LoggingDestination.class); - private static final AtomicInteger counter = new AtomicInteger(); - @Override public AirbyteConnectionStatus check(final JsonNode config) { return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); @@ -52,14 +46,8 @@ public void start() {} @Override public void accept(final AirbyteMessage message) { if (message.getType() == Type.STATE) { - LOGGER.info("emitting state: {}", message); outputRecordCollector.accept(message); } - - final var curr = counter.incrementAndGet(); - if (curr % 1000 == 0) { - LOGGER.info("Record count: {}", curr); - } } @Override diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java index c6bf3ee08dbe..a9055b8ba173 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/TestingDestinations.java @@ -11,6 +11,7 @@ import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.util.Map; @@ -58,7 +59,11 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, @Override public AirbyteConnectionStatus check(final JsonNode config) throws Exception { - return selectDestination(config).check(config); + try { + return selectDestination(config).check(config); + } catch (final Exception e) { + return new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(e.getMessage()); + } } public static void main(final String[] args) throws Exception { diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.java index 36a65cc77c57..b3be0d18c6e9 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.java +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/ThrottledDestination.java @@ -47,6 +47,7 @@ public static class ThrottledConsumer implements AirbyteMessageConsumer { public ThrottledConsumer(final long millisPerRecord, final Consumer outputRecordCollector) { this.millisPerRecord = millisPerRecord; this.outputRecordCollector = outputRecordCollector; + LOGGER.info("Will sleep {} millis before processing every record", millisPerRecord); } @Override @@ -54,12 +55,10 @@ public void start() {} @Override public void accept(final AirbyteMessage message) throws Exception { - LOGGER.info("received record: {}", message); sleep(millisPerRecord); - LOGGER.info("completed sleep"); if (message.getType() == Type.STATE) { - LOGGER.info("emitting state: {}", message); + LOGGER.info("Emitting state: {}", message); outputRecordCollector.accept(message); } } diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java new file mode 100644 index 000000000000..2f6f2c669f0b --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/BaseLogger.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.e2e_test.logging; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +public abstract class BaseLogger implements TestingLogger { + + protected final AirbyteStreamNameNamespacePair streamNamePair; + protected final int maxEntryCount; + protected int loggedEntryCount = 0; + + public BaseLogger(final AirbyteStreamNameNamespacePair streamNamePair, final int maxEntryCount) { + this.streamNamePair = streamNamePair; + this.maxEntryCount = maxEntryCount; + } + + protected String entryMessage(final AirbyteRecordMessage recordMessage) { + return String.format("[%s] %s #%04d: %s", + emissionTimestamp(recordMessage.getEmittedAt()), + streamName(streamNamePair), + loggedEntryCount, + recordMessage.getData()); + } + + protected static String streamName(final AirbyteStreamNameNamespacePair pair) { + if (pair.getNamespace() == null) { + return pair.getName(); + } else { + return String.format("%s.%s", pair.getNamespace(), pair.getName()); + } + } + + protected static String emissionTimestamp(final long emittedAt) { + return OffsetDateTime + .ofInstant(Instant.ofEpochMilli(emittedAt), ZoneId.systemDefault()) + .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + } + +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java new file mode 100644 index 000000000000..53ebadd769c9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/EveryNthLogger.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.e2e_test.logging; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EveryNthLogger extends BaseLogger implements TestingLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(EveryNthLogger.class); + + private final int nthEntryToLog; + private int currentEntry = 0; + + public EveryNthLogger(final AirbyteStreamNameNamespacePair streamNamePair, final int nthEntryToLog, final int maxEntryCount) { + super(streamNamePair, maxEntryCount); + this.nthEntryToLog = nthEntryToLog; + } + + @Override + public void log(final AirbyteRecordMessage recordMessage) { + if (loggedEntryCount >= maxEntryCount) { + return; + } + + currentEntry += 1; + if (currentEntry % nthEntryToLog == 0) { + loggedEntryCount += 1; + LOGGER.info(entryMessage(recordMessage)); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java new file mode 100644 index 000000000000..a877b0edce0c --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/FirstNLogger.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.e2e_test.logging; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FirstNLogger extends BaseLogger implements TestingLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(FirstNLogger.class); + + public FirstNLogger(final AirbyteStreamNameNamespacePair streamNamePair, final int maxEntryCount) { + super(streamNamePair, maxEntryCount); + } + + @Override + public void log(final AirbyteRecordMessage recordMessage) { + if (loggedEntryCount >= maxEntryCount) { + return; + } + + loggedEntryCount += 1; + LOGGER.info(entryMessage(recordMessage)); + } + +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java new file mode 100644 index 000000000000..ff7ce7ad282a --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/LoggingConsumer.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.e2e_test.logging; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoggingConsumer implements AirbyteMessageConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(LoggingConsumer.class); + + private final TestingLoggerFactory loggerFactory; + private final ConfiguredAirbyteCatalog configuredCatalog; + private final Consumer outputRecordCollector; + private final Map loggers; + + public LoggingConsumer(final TestingLoggerFactory loggerFactory, + final ConfiguredAirbyteCatalog configuredCatalog, + final Consumer outputRecordCollector) { + this.loggerFactory = loggerFactory; + this.configuredCatalog = configuredCatalog; + this.outputRecordCollector = outputRecordCollector; + this.loggers = new HashMap<>(); + } + + @Override + public void start() { + for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { + final AirbyteStream stream = configuredStream.getStream(); + final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); + final TestingLogger logger = loggerFactory.create(streamNamePair); + loggers.put(streamNamePair, logger); + } + } + + @Override + public void accept(final AirbyteMessage message) { + if (message.getType() == Type.STATE) { + LOGGER.info("Emitting state: {}", message); + outputRecordCollector.accept(message); + return; + } else if (message.getType() != Type.RECORD) { + return; + } + + final AirbyteRecordMessage recordMessage = message.getRecord(); + final AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage); + + if (!loggers.containsKey(pair)) { + throw new IllegalArgumentException( + String.format( + "Message contained record from a stream that was not in the catalog.\n Catalog: %s\n Message: %s", + Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); + } + + loggers.get(pair).log(recordMessage); + } + + @Override + public void close() {} + +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java new file mode 100644 index 000000000000..93831e0e3c20 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/RandomSamplingLogger.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.e2e_test.logging; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.Random; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RandomSamplingLogger extends BaseLogger implements TestingLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(RandomSamplingLogger.class); + + private final double samplingRatio; + private final Random random; + + public RandomSamplingLogger(final AirbyteStreamNameNamespacePair streamNamePair, + final double samplingRatio, + final long seed, + final int maxEntryCount) { + super(streamNamePair, maxEntryCount); + this.samplingRatio = samplingRatio; + this.random = new Random(seed); + } + + @Override + public void log(final AirbyteRecordMessage recordMessage) { + if (loggedEntryCount >= maxEntryCount) { + return; + } + + if (random.nextDouble() < samplingRatio) { + loggedEntryCount += 1; + LOGGER.info(entryMessage(recordMessage)); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.java new file mode 100644 index 000000000000..b4d2a3f81c78 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLogger.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.e2e_test.logging; + +import io.airbyte.protocol.models.AirbyteRecordMessage; + +public interface TestingLogger { + + enum LoggingType { + FirstN, + EveryNth, + RandomSampling + } + + void log(AirbyteRecordMessage recordMessage); + +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java new file mode 100644 index 000000000000..4ac900c8584c --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/java/io/airbyte/integrations/destination/e2e_test/logging/TestingLoggerFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.e2e_test.logging; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.destination.e2e_test.logging.TestingLogger.LoggingType; + +public class TestingLoggerFactory { + + private final JsonNode config; + + public TestingLoggerFactory(final JsonNode config) { + this.config = config; + } + + public TestingLogger create(final AirbyteStreamNameNamespacePair streamNamePair) { + if (!config.has("logging_config")) { + throw new IllegalArgumentException("Property logging_config is required, but not found"); + } + + final JsonNode logConfig = config.get("logging_config"); + final LoggingType loggingType = LoggingType.valueOf(logConfig.get("logging_type").asText()); + switch (loggingType) { + case FirstN -> { + final int maxEntryCount = logConfig.get("max_entry_count").asInt(); + return new FirstNLogger(streamNamePair, maxEntryCount); + } + case EveryNth -> { + final int nthEntryToLog = logConfig.get("nth_entry_to_log").asInt(); + final int maxEntryCount = logConfig.get("max_entry_count").asInt(); + return new EveryNthLogger(streamNamePair, nthEntryToLog, maxEntryCount); + } + case RandomSampling -> { + final double samplingRatio = logConfig.get("sampling_ratio").asDouble(); + final long seed = logConfig.has("seed") ? logConfig.get("seed").asLong() : System.currentTimeMillis(); + final int maxEntryCount = logConfig.get("max_entry_count").asInt(); + return new RandomSamplingLogger(streamNamePair, samplingRatio, seed, maxEntryCount); + } + default -> throw new IllegalArgumentException("Unexpected logging type: " + loggingType); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-e2e-test/src/main/resources/spec.json index 39df3bbafb29..7fc0e45a6801 100644 --- a/airbyte-integrations/connectors/destination-e2e-test/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-e2e-test/src/main/resources/spec.json @@ -11,20 +11,122 @@ "oneOf": [ { "title": "Logging", - "required": ["type"], - "additionalProperties": false, + "required": ["type", "logging_config"], "properties": { "type": { "type": "string", "const": "LOGGING", "default": "LOGGING" + }, + "logging_config": { + "title": "Logging Configuration", + "type": "object", + "description": "Configurate how the messages are logged.", + "oneOf": [ + { + "title": "First N Entries", + "description": "Log first N entries per stream.", + "type": "object", + "required": ["logging_type", "max_entry_count"], + "properties": { + "logging_type": { + "type": "string", + "enum": ["FirstN"], + "default": "FirstN" + }, + "max_entry_count": { + "title": "N", + "description": "Number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries.", + "type": "number", + "default": 100, + "examples": [100], + "minimum": 1, + "maximum": 1000 + } + } + }, + { + "title": "Every N-th Entry", + "description": "For each stream, log every N-th entry with a maximum cap.", + "type": "object", + "required": [ + "logging_type", + "nth_entry_to_log", + "max_entry_count" + ], + "properties": { + "logging_type": { + "type": "string", + "enum": ["EveryNth"], + "default": "EveryNth" + }, + "nth_entry_to_log": { + "title": "N", + "description": "The N-th entry to log for each stream. N starts from 1. For example, when N = 1, every entry is logged; when N = 2, every other entry is logged; when N = 3, one out of three entries is logged.", + "type": "number", + "example": [3], + "minimum": 1, + "maximum": 1000 + }, + "max_entry_count": { + "title": "Max Log Entries", + "description": "Max number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries.", + "type": "number", + "default": 100, + "examples": [100], + "minimum": 1, + "maximum": 1000 + } + } + }, + { + "title": "Random Sampling", + "description": "For each stream, randomly log a percentage of the entries with a maximum cap.", + "type": "object", + "required": [ + "logging_type", + "sampling_ratio", + "max_entry_count" + ], + "properties": { + "logging_type": { + "type": "string", + "enum": ["RandomSampling"], + "default": "RandomSampling" + }, + "sampling_ratio": { + "title": "Sampling Ratio", + "description": "A positive floating number smaller than 1.", + "type": "number", + "default": 0.001, + "examples": [0.001], + "minimum": 0, + "maximum": 1 + }, + "seed": { + "title": "Random Number Generator Seed", + "description": "When the seed is unspecified, the current time millis will be used as the seed.", + "type": "number", + "examples": [1900] + }, + "max_entry_count": { + "title": "Max Log Entries", + "description": "Max number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries.", + "type": "number", + "default": 100, + "examples": [100], + "minimum": 1, + "maximum": 1000 + } + } + } + ] } } }, { "title": "Silent", "required": ["type"], - "additionalProperties": false, "properties": { "type": { "type": "string", @@ -36,7 +138,6 @@ { "title": "Throttled", "required": ["type", "millis_per_record"], - "additionalProperties": false, "properties": { "type": { "type": "string", @@ -44,7 +145,7 @@ "default": "THROTTLED" }, "millis_per_record": { - "description": "Time to pause in between records.", + "description": "Number of milli-second to pause in between records.", "type": "integer" } } @@ -52,7 +153,6 @@ { "title": "Failing", "required": ["type", "num_messages"], - "additionalProperties": false, "properties": { "type": { "type": "string", diff --git a/airbyte-integrations/connectors/destination-e2e-test/src/test-integration/java/io/airbyte/integrations/destination/e2e_test/TestingSilentDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-e2e-test/src/test-integration/java/io/airbyte/integrations/destination/e2e_test/TestingSilentDestinationAcceptanceTest.java new file mode 100644 index 000000000000..b7b7034a5918 --- /dev/null +++ b/airbyte-integrations/connectors/destination-e2e-test/src/test-integration/java/io/airbyte/integrations/destination/e2e_test/TestingSilentDestinationAcceptanceTest.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.e2e_test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.e2e_test.TestingDestinations.TestDestinationType; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.Collections; +import java.util.List; + +public class TestingSilentDestinationAcceptanceTest extends DestinationAcceptanceTest { + + @Override + protected String getImageName() { + return "airbyte/destination-e2e-test:dev"; + } + + @Override + protected JsonNode getConfig() { + return Jsons.jsonNode(Collections.singletonMap("type", TestDestinationType.SILENT.name())); + } + + @Override + protected JsonNode getFailCheckConfig() { + return Jsons.jsonNode(Collections.singletonMap("type", "invalid")); + } + + @Override + protected List retrieveRecords(final TestDestinationEnv testEnv, + final String streamName, + final String namespace, + final JsonNode streamSchema) { + return Collections.emptyList(); + } + + @Override + protected void setup(final TestDestinationEnv testEnv) { + // do nothing + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + // do nothing + } + + @Override + protected void assertSameMessages(final List expected, + final List actual, + final boolean pruneAirbyteInternalFields) { + assertEquals(0, actual.size()); + } + +} diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 855533918f0b..4ac7009144a8 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -722,7 +722,7 @@ public void testCheckpointing() throws Exception { "E2E Test Destination -" + UUID.randomUUID(), workspaceId, destinationDefinition.getDestinationDefinitionId(), - Jsons.jsonNode(ImmutableMap.of("type", "LOGGING"))); + Jsons.jsonNode(ImmutableMap.of("type", "SILENT"))); final String connectionName = "test-connection"; final UUID sourceId = source.getSourceId(); diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index c237c61a1cba..7d0255eb5d33 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -164,6 +164,7 @@ * [Databricks](integrations/destinations/databricks.md) * [DynamoDB](integrations/destinations/dynamodb.md) * [Elasticsearch](integrations/destinations/elasticsearch.md) + * [End-to-End Testing](integrations/destinations/e2e-test.md) * [Chargify](integrations/destinations/chargify.md) * [Google Cloud Storage (GCS)](integrations/destinations/gcs.md) * [Google Firestore](integrations/destinations/firestore.md) diff --git a/docs/integrations/destinations/e2e-test.md b/docs/integrations/destinations/e2e-test.md new file mode 100644 index 000000000000..3edfaf50155c --- /dev/null +++ b/docs/integrations/destinations/e2e-test.md @@ -0,0 +1,56 @@ +# End-to-End Testing Destination + +This destination is for testing of Airbyte connections. It can be set up as a source message logger, a `/dev/null`, or to mimic specific behaviors (e.g. exception during the sync). Please use it with discretion. This destination may log your data, and expose sensitive information. + +## Features + +| Feature | Supported | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | Yes | | +| Incremental Sync | Yes | | +| Replicate Incremental Deletes | No | | +| SSL connection | No | | +| SSH Tunnel Support | No | | + +## Mode + +### Silent (`/dev/null`) + +**This is the only mode allowed on Airbyte Cloud.** + +This mode works as `/dev/null`. It does nothing about any data from the source connector. This is usually only useful for performance testing of the source connector. + +### Logging + +This mode logs the data from the source connector. It will log at most 1,000 data entries. + +There are the different logging modes to choose from: + +| Mode | Notes | Parameters | +| :--- | :--- | :--- | +| First N entries | Log the first N number of data entries for each data stream. | N: how many entries to log. | +| Every N-th entry | Log every N-th entry for each data stream. When N=1, it will log every entry. When N=2, it will log every other entry. Etc. | N: the N-th entry to log. Max entry count: max number of entries to log. | +| Random sampling | Log a random percentage of the entries for each data stream. | Sampling ratio: a number in range of `[0, 1]`. Optional seed: default to system epoch time. Max entry count: max number of entries to log. | + +### Throttling + +This mode mimics a slow data sync. You can specify the time (in millisecond) of delay between each message from the source is processed. + +### Failing + +This mode throws an exception after receiving a configurable number of messages. + +## CHANGELOG + +### E2E Testing Destination + +| Version | Date | Pull Request | Subject | +| :------ | :--------- | :------------------------------------------------------- | :--- | +| 0.2.0 | 2021-12-16 | [\#8824](https://github.com/airbytehq/airbyte/pull/8824) | Add multiple logging modes. | +| 0.1.0 | 2021-05-25 | [\#3290](https://github.com/airbytehq/airbyte/pull/3290) | Create initial version. | + +### Dev Null Destination + +| Version | Date | Pull Request | Subject | +| :------ | :--------- | :------------------------------------------------------- | :--- | +| 0.1.0 | 2021-12-16 | [\#8824](https://github.com/airbytehq/airbyte/pull/8824) | Create initial version. |