diff --git a/airbyte-integrations/connectors/destination-null/.dockerignore b/airbyte-integrations/connectors/destination-null/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-null/Dockerfile b/airbyte-integrations/connectors/destination-null/Dockerfile new file mode 100644 index 000000000000..24f089085779 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte +ENV APPLICATION destination-null + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-null diff --git a/airbyte-integrations/connectors/destination-null/README.md b/airbyte-integrations/connectors/destination-null/README.md new file mode 100644 index 000000000000..3079fc9aa19e --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/README.md @@ -0,0 +1,68 @@ +# Destination Null + +This is the repository for the Null destination connector in Java. +For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/null). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-null:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-null:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-null:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-null:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-null:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-null:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/io/airbyte/integrations/destinations/null`. + +#### Acceptance Tests +Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/destinations/nullDestinationAcceptanceTest.java`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-null:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-null:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-null/build.gradle b/airbyte-integrations/connectors/destination-null/build.gradle new file mode 100644 index 000000000000..2aaa93d11eb3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.destination_null.NullDestination' +} + +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:bases:base-java') + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-null') +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/NullConsumer.java b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/NullConsumer.java new file mode 100644 index 000000000000..ed45c356b6e2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/NullConsumer.java @@ -0,0 +1,76 @@ +package io.airbyte.integrations.destination.destination_null; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.destination.destination_null.logger.NullDestinationLogger; +import io.airbyte.integrations.destination.destination_null.logger.NullDestinationLoggerFactory; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +public class NullConsumer extends FailureTrackingAirbyteMessageConsumer { + + private final NullDestinationLoggerFactory loggerFactory; + private final ConfiguredAirbyteCatalog configuredCatalog; + private final Consumer outputRecordCollector; + private final Map loggers; + + private AirbyteMessage lastStateMessage = null; + + public NullConsumer(NullDestinationLoggerFactory loggerFactory, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + this.loggerFactory = loggerFactory; + this.configuredCatalog = configuredCatalog; + this.outputRecordCollector = outputRecordCollector; + this.loggers = new HashMap<>(); + } + + @Override + protected void startTracked() { + for (ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { + final AirbyteStream stream = configuredStream.getStream(); + final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); + final NullDestinationLogger logger = loggerFactory.create(streamNamePair); + loggers.put(streamNamePair, logger); + } + } + + @Override + protected void acceptTracked(AirbyteMessage airbyteMessage) { + if (airbyteMessage.getType() == Type.STATE) { + this.lastStateMessage = airbyteMessage; + return; + } else if (airbyteMessage.getType() != Type.RECORD) { + return; + } + + AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); + AirbyteStreamNameNamespacePair pair = AirbyteStreamNameNamespacePair + .fromRecordMessage(recordMessage); + + if (!loggers.containsKey(pair)) { + throw new IllegalArgumentException( + String.format( + "Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", + Jsons.serialize(configuredCatalog), Jsons.serialize(recordMessage))); + } + + loggers.get(pair).log(recordMessage); + } + + @Override + protected void close(boolean hasFailed) { + if (!hasFailed) { + outputRecordCollector.accept(lastStateMessage); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/NullDestination.java b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/NullDestination.java new file mode 100644 index 000000000000..13fca00407e4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/NullDestination.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.destination_null; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.destination_null.logger.NullDestinationLogger; +import io.airbyte.integrations.destination.destination_null.logger.NullDestinationLoggerFactory; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NullDestination extends BaseConnector implements Destination { + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new NullDestination()).run(args); + } + + /** + * The null destination is always available! + */ + @Override + public AirbyteConnectionStatus check(JsonNode config) { + return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } + + @Override + public AirbyteMessageConsumer getConsumer(JsonNode config, + ConfiguredAirbyteCatalog configuredCatalog, + Consumer outputRecordCollector) { + return new NullConsumer(new NullDestinationLoggerFactory(config), configuredCatalog, outputRecordCollector); + } + +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/BaseLogger.java b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/BaseLogger.java new file mode 100644 index 000000000000..df65e9f2be40 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/BaseLogger.java @@ -0,0 +1,45 @@ +package io.airbyte.integrations.destination.destination_null.logger; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class BaseLogger implements NullDestinationLogger { + + protected final AirbyteStreamNameNamespacePair streamNamePair; + protected final int maxEntryCount; + protected int loggedEntryCount = 0; + + public BaseLogger(AirbyteStreamNameNamespacePair streamNamePair, int maxEntryCount) { + this.streamNamePair = streamNamePair; + this.maxEntryCount = maxEntryCount; + } + + protected String entryMessage(AirbyteRecordMessage recordMessage) { + return String.format("[%s] %s #%04d: %s", + emissionTimestamp(recordMessage.getEmittedAt()), + streamName(streamNamePair), + loggedEntryCount, + recordMessage.getData()); + } + + protected static String streamName(AirbyteStreamNameNamespacePair pair) { + if (pair.getNamespace() == null) { + return pair.getName(); + } else { + return String.format("%s.%s", pair.getNamespace(), pair.getName()); + } + } + + protected static String emissionTimestamp(long emittedAt) { + return OffsetDateTime + .ofInstant(Instant.ofEpochMilli(emittedAt), ZoneId.systemDefault()) + .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + } + +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/DevNull.java b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/DevNull.java new file mode 100644 index 000000000000..a627cfa44f59 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/DevNull.java @@ -0,0 +1,17 @@ +package io.airbyte.integrations.destination.destination_null.logger; + +import io.airbyte.protocol.models.AirbyteRecordMessage; + +public class DevNull implements NullDestinationLogger { + + public static final DevNull SINGLETON = new DevNull(); + + private DevNull() { + } + + @Override + public void log(AirbyteRecordMessage recordMessage) { + // nothing happens in /dev/null + } + +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/EveryNthLogger.java b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/EveryNthLogger.java new file mode 100644 index 000000000000..5d645e888dac --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/EveryNthLogger.java @@ -0,0 +1,33 @@ +package io.airbyte.integrations.destination.destination_null.logger; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EveryNthLogger extends BaseLogger implements NullDestinationLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(EveryNthLogger.class); + + private final int nthEntryToLog; + private int currentEntry = 0; + + public EveryNthLogger(AirbyteStreamNameNamespacePair streamNamePair, int nthEntryToLog, int maxEntryCount) { + super(streamNamePair, maxEntryCount); + this.nthEntryToLog = nthEntryToLog; + } + + @Override + public void log(AirbyteRecordMessage recordMessage) { + if (loggedEntryCount >= maxEntryCount) { + return; + } + + currentEntry += 1; + if (currentEntry % nthEntryToLog == 0) { + loggedEntryCount += 1; + LOGGER.info(entryMessage(recordMessage)); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/FirstNLogger.java b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/FirstNLogger.java new file mode 100644 index 000000000000..a394c657b479 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/FirstNLogger.java @@ -0,0 +1,26 @@ +package io.airbyte.integrations.destination.destination_null.logger; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FirstNLogger extends BaseLogger implements NullDestinationLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(FirstNLogger.class); + + public FirstNLogger(AirbyteStreamNameNamespacePair streamNamePair, int maxEntryCount) { + super(streamNamePair, maxEntryCount); + } + + @Override + public void log(AirbyteRecordMessage recordMessage) { + if (loggedEntryCount >= maxEntryCount) { + return; + } + + loggedEntryCount += 1; + LOGGER.info(entryMessage(recordMessage)); + } + +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/NullDestinationLogger.java b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/NullDestinationLogger.java new file mode 100644 index 000000000000..627ab41b46a0 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/NullDestinationLogger.java @@ -0,0 +1,16 @@ +package io.airbyte.integrations.destination.destination_null.logger; + +import io.airbyte.protocol.models.AirbyteRecordMessage; + +public interface NullDestinationLogger { + + enum LoggingType { + NoLogging, + FirstN, + EveryNth, + RandomSampling + } + + void log(AirbyteRecordMessage recordMessage); + +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/NullDestinationLoggerFactory.java b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/NullDestinationLoggerFactory.java new file mode 100644 index 000000000000..0818264b32f8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/NullDestinationLoggerFactory.java @@ -0,0 +1,45 @@ +package io.airbyte.integrations.destination.destination_null.logger; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.destination.destination_null.logger.NullDestinationLogger.LoggingType; + +public class NullDestinationLoggerFactory { + + private final JsonNode config; + + public NullDestinationLoggerFactory(JsonNode config) { + this.config = config; + } + + public NullDestinationLogger create(AirbyteStreamNameNamespacePair streamNamePair) { + if (!config.has("logging")) { + throw new IllegalArgumentException("Property logging is required, but not found"); + } + + final JsonNode logConfig = config.get("logging"); + final LoggingType loggingType = LoggingType.valueOf(logConfig.get("logging_type").asText()); + switch (loggingType) { + case NoLogging -> { + return DevNull.SINGLETON; + } + case FirstN -> { + final int maxEntryCount = logConfig.get("max_entry_count").asInt(); + return new FirstNLogger(streamNamePair, maxEntryCount); + } + case EveryNth -> { + final int nthEntryToLog = logConfig.get("nth_entry_to_log").asInt(); + final int maxEntryCount = logConfig.get("max_entry_count").asInt(); + return new EveryNthLogger(streamNamePair, nthEntryToLog, maxEntryCount); + } + case RandomSampling -> { + final double samplingRatio = logConfig.get("sampling_ratio").asDouble(); + final long seed = logConfig.has("seed") ? logConfig.get("seed").asLong() : System.currentTimeMillis(); + final int maxEntryCount = logConfig.get("max_entry_count").asInt(); + return new RandomSamplingLogger(streamNamePair, samplingRatio, seed, maxEntryCount); + } + default -> throw new IllegalArgumentException("Unexpected logging type: " + loggingType); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/RandomSamplingLogger.java b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/RandomSamplingLogger.java new file mode 100644 index 000000000000..aecb98d4f03d --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/java/io/airbyte/integrations/destination/destination_null/logger/RandomSamplingLogger.java @@ -0,0 +1,34 @@ +package io.airbyte.integrations.destination.destination_null.logger; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.Random; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RandomSamplingLogger extends BaseLogger implements NullDestinationLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(RandomSamplingLogger.class); + + private final double samplingRatio; + private final Random random; + + public RandomSamplingLogger(AirbyteStreamNameNamespacePair streamNamePair, double samplingRatio, long seed, int maxEntryCount) { + super(streamNamePair, maxEntryCount); + this.samplingRatio = samplingRatio; + this.random = new Random(seed); + } + + @Override + public void log(AirbyteRecordMessage recordMessage) { + if (loggedEntryCount >= maxEntryCount) { + return; + } + + if (random.nextDouble() < samplingRatio) { + loggedEntryCount += 1; + LOGGER.info(entryMessage(recordMessage)); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-null/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-null/src/main/resources/spec.json new file mode 100644 index 000000000000..55c96bba3a3e --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/main/resources/spec.json @@ -0,0 +1,123 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/null", + "supportsIncremental": true, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Null Destination Spec", + "type": "object", + "required": ["logging"], + "additionalProperties": false, + "properties": { + "logging": { + "title": "Log Output", + "type": "object", + "description": "Whether the data should be logged.", + "oneOf": [ + { + "title": "No Logging", + "description": "Log nothing. A pure /dev/null.", + "type": "object", + "required": ["logging_type"], + "properties": { + "logging_type": { + "type": "string", + "enum": ["NoLogging"], + "default": "NoLogging" + } + } + }, + { + "title": "First N Entries", + "description": "Log first N entries per stream.", + "type": "object", + "required": ["logging_type", "max_entry_count"], + "properties": { + "logging_type": { + "type": "string", + "enum": ["FirstN"], + "default": "FirstN" + }, + "max_entry_count": { + "title": "N", + "description": "Number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries.", + "type": "number", + "default": 100, + "examples": [100], + "minimum": 1, + "maximum": 1000 + } + } + }, + { + "title": "Every N-th Entry", + "description": "For each stream, log every N-th entry with a maximum cap.", + "type": "object", + "required": ["logging_type", "nth_entry_to_log", "max_entry_count"], + "properties": { + "logging_type": { + "type": "string", + "enum": ["EveryNth"], + "default": "EveryNth" + }, + "nth_entry_to_log": { + "title": "N", + "description": "The N-th entry to log for each stream. N starts from 1. For example, when N = 1, every entry is logged; when N = 2, every other entry is logged; when N = 3, one out of three entries is logged.", + "type": "number", + "example": [3], + "minimum": 1, + "maximum": 1000 + }, + "max_entry_count": { + "title": "Max Log Entries", + "description": "Max number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries.", + "type": "number", + "default": 100, + "examples": [100], + "minimum": 1, + "maximum": 1000 + } + } + }, + { + "title": "Random Sampling", + "description": "For each stream, randomly log a percentage of the entries with a maximum cap.", + "type": "object", + "required": ["logging_type", "sampling_ratio", "max_entry_count"], + "properties": { + "logging_type": { + "type": "string", + "enum": ["RandomSampling"], + "default": "RandomSampling" + }, + "sampling_ratio": { + "title": "Sampling Ratio", + "description": "A positive floating number smaller than 1.", + "type": "number", + "default": 0.001, + "examples": [0.001], + "minimum": 0, + "maximum": 1 + }, + "seed": { + "title": "Random Number Generator Seed", + "description": "When the seed is unspecified, the current time millis will be used as the seed.", + "type": "number", + "examples": [1900] + }, + "max_entry_count": { + "title": "Max Log Entries", + "description": "Max number of entries to log. This destination is for testing only. So it won't make sense to log infinitely. The maximum is 1,000 entries.", + "type": "number", + "default": 100, + "examples": [100], + "minimum": 1, + "maximum": 1000 + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-null/src/test-integration/java/io/airbyte/integrations/destination/destination_null/NullDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-null/src/test-integration/java/io/airbyte/integrations/destination/destination_null/NullDestinationAcceptanceTest.java new file mode 100644 index 000000000000..6a1263b42573 --- /dev/null +++ b/airbyte-integrations/connectors/destination-null/src/test-integration/java/io/airbyte/integrations/destination/destination_null/NullDestinationAcceptanceTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.destination_null; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.Collections; +import java.util.List; + +public class NullDestinationAcceptanceTest extends DestinationAcceptanceTest { + + @Override + protected String getImageName() { + return "airbyte/destination-null:dev"; + } + + @Override + protected JsonNode getConfig() { + final JsonNode loggingConfig = Jsons.jsonNode(ImmutableMap.builder() + .put("logging_type", "EveryNth") + .put("nth_entry_to_log", 1) + .put("max_entry_count", 3) + .build()); + return Jsons.jsonNode(Collections.singletonMap("logging", loggingConfig)); + } + + @Override + protected JsonNode getFailCheckConfig() { + final JsonNode loggingConfig = Jsons.jsonNode(ImmutableMap.builder() + .put("logging_type", "LastN") + .put("max_entry_count", 3000) + .build()); + return Jsons.jsonNode(Collections.singletonMap("logging", loggingConfig)); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) { + return Collections.emptyList(); + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + // do nothing + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + // do nothing + } + + @Override + protected void assertSameMessages(List expected, List actual, boolean pruneAirbyteInternalFields) { + // do nothing + } + +} diff --git a/docs/integrations/destinations/null.md b/docs/integrations/destinations/null.md new file mode 100644 index 000000000000..966555a7c4bd --- /dev/null +++ b/docs/integrations/destinations/null.md @@ -0,0 +1,52 @@ +# Null + +TODO: update this doc + +## Sync overview + +### Output schema + +Is the output schema fixed (e.g: for an API like Stripe)? If so, point to the connector's schema (e.g: link to Stripe’s documentation) or describe the schema here directly (e.g: include a diagram or paragraphs describing the schema). + +Describe how the connector's schema is mapped to Airbyte concepts. An example description might be: "MagicDB tables become Airbyte Streams and MagicDB columns become Airbyte Fields. In addition, an extracted\_at column is appended to each row being read." + +### Data type mapping + +This section should contain a table mapping each of the connector's data types to Airbyte types. At the moment, Airbyte uses the same types used by [JSONSchema](https://json-schema.org/understanding-json-schema/reference/index.html). `string`, `date-time`, `object`, `array`, `boolean`, `integer`, and `number` are the most commonly used data types. + +| Integration Type | Airbyte Type | Notes | +| :--- | :--- | :--- | + + +### Features + +This section should contain a table with the following format: + +| Feature | Supported?(Yes/No) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | | | +| Incremental Sync | | | +| Replicate Incremental Deletes | | | +| For databases, WAL/Logical replication | | | +| SSL connection | | | +| SSH Tunnel Support | | | +| (Any other source-specific features) | | | + +### Performance considerations + +Could this connector hurt the user's database/API/etc... or put too much strain on it in certain circumstances? For example, if there are a lot of tables or rows in a table? What is the breaking point (e.g: 100mm> records)? What can the user do to prevent this? (e.g: use a read-only replica, or schedule frequent syncs, etc..) + +## Getting started + +### Requirements + +* What versions of this connector does this implementation support? (e.g: `postgres v3.14 and above`) +* What configurations, if any, are required on the connector? (e.g: `buffer_size > 1024`) +* Network accessibility requirements +* Credentials/authentication requirements? (e.g: A DB user with read permissions on certain tables) + +### Setup guide + +For each of the above high-level requirements as appropriate, add or point to a follow-along guide. See existing source or destination guides for an example. + +For each major cloud provider we support, also add a follow-along guide for setting up Airbyte to connect to that destination. See the Postgres destination guide for an example of what this should look like.