diff --git a/airbyte-config/init/src/main/resources/icons/databend.svg b/airbyte-config/init/src/main/resources/icons/databend.svg new file mode 100644 index 000000000000..b6afca7ea9eb --- /dev/null +++ b/airbyte-config/init/src/main/resources/icons/databend.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 22a3651376b9..60632d14bbaf 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -418,3 +418,10 @@ documentationUrl: https://docs.airbyte.com/integrations/destinations/yugabytedb icon: yugabytedb.svg releaseStage: alpha +- name: Databend + destinationDefinitionId: 302e4d8e-08d3-4098-acd4-ac67ca365b88 + dockerRepository: airbyte/destination-databend + dockerImageTag: 0.1.0 + icon: databend.svg + documentationUrl: https://docs.airbyte.com/integrations/destinations/databend + releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index faf403561ba4..bc47b88d1148 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -7140,3 +7140,69 @@ supported_destination_sync_modes: - "overwrite" - "append" +- dockerImage: "airbyte/destination-databend:0.1.0" + spec: + documentationUrl: "https://docs.airbyte.com/integrations/destinations/databend" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "Destination Databend" + type: "object" + required: + - "host" + - "username" + - "database" + additionalProperties: true + properties: + host: + title: "Host" + description: "Hostname of the database." + type: "string" + order: 0 + protocol: + title: "Protocol" + description: "Protocol of the host." + type: "string" + examples: + - "https" + default: "https" + order: 1 + port: + title: "Port" + description: "Port of the database." + type: "integer" + minimum: 0 + maximum: 65536 + default: 443 + examples: + - "443" + order: 2 + database: + title: "DB Name" + description: "Name of the database." + type: "string" + order: 3 + table: + title: "Default Table" + description: "The default table was written to." + type: "string" + examples: + - "default" + default: "default" + order: 4 + username: + title: "User" + description: "Username to use to access the database." + type: "string" + order: 5 + password: + title: "Password" + description: "Password associated with the username." + type: "string" + airbyte_secret: true + order: 6 + supportsIncremental: true + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: + - "overwrite" + - "append" diff --git a/airbyte-integrations/bases/base-normalization/build.gradle b/airbyte-integrations/bases/base-normalization/build.gradle index 5030b7264723..a95816c2d0b9 100644 --- a/airbyte-integrations/bases/base-normalization/build.gradle +++ b/airbyte-integrations/bases/base-normalization/build.gradle @@ -1,5 +1,3 @@ -import java.nio.file.Paths - plugins { id 'airbyte-docker' id 'airbyte-python' diff --git a/airbyte-integrations/connectors/destination-databend/.dockerignore b/airbyte-integrations/connectors/destination-databend/.dockerignore new file mode 100644 index 000000000000..57f4cf36c057 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/.dockerignore @@ -0,0 +1,5 @@ +* +!Dockerfile +!main.py +!destination_databend +!setup.py diff --git a/airbyte-integrations/connectors/destination-databend/Dockerfile b/airbyte-integrations/connectors/destination-databend/Dockerfile new file mode 100644 index 000000000000..6619497f7f83 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.9.11-alpine3.15 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY destination_databend ./destination_databend + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-databend diff --git a/airbyte-integrations/connectors/destination-databend/README.md b/airbyte-integrations/connectors/destination-databend/README.md new file mode 100644 index 000000000000..8ef9f9a85c16 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/README.md @@ -0,0 +1,123 @@ +# Databend Destination + +This is the repository for the Databend destination connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/destinations/databend). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-databend:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/destinations/databend) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_databend/spec.json` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination databend test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/destination-databend:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:destination-databend:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-databend:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databend:dev check --config /secrets/config.json +# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages +cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-databend:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing + Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Coming soon: + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:destination-databend:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:destination-databend:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-databend/build.gradle b/airbyte-integrations/connectors/destination-databend/build.gradle new file mode 100644 index 000000000000..dd8a2bfb94e1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/build.gradle @@ -0,0 +1,8 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' +} + +airbytePython { + moduleDirectory 'destination_databend' +} diff --git a/airbyte-integrations/connectors/destination-databend/destination_databend/__init__.py b/airbyte-integrations/connectors/destination-databend/destination_databend/__init__.py new file mode 100644 index 000000000000..fe96a70e2b77 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/destination_databend/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from .destination import DestinationDatabend + +__all__ = ["DestinationDatabend"] diff --git a/airbyte-integrations/connectors/destination-databend/destination_databend/__init__.pyc b/airbyte-integrations/connectors/destination-databend/destination_databend/__init__.pyc new file mode 100644 index 000000000000..4538a0add1a4 Binary files /dev/null and b/airbyte-integrations/connectors/destination-databend/destination_databend/__init__.pyc differ diff --git a/airbyte-integrations/connectors/destination-databend/destination_databend/client.py b/airbyte-integrations/connectors/destination-databend/destination_databend/client.py new file mode 100644 index 000000000000..989cd1dd1059 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/destination_databend/client.py @@ -0,0 +1,21 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from databend_sqlalchemy import connector + + +class DatabendClient: + def __init__(self, protocol: str, host: str, port: int, database: str, table: str, username: str, password: str = None): + self.protocol = protocol + self.host = host + self.port = port + self.database = database + self.table = table + self.username = username + self.password = password + + def open(self): + handle = connector.connect(f"{self.protocol}://{self.username}:{self.password}@{self.host}:{self.port}").cursor() + + return handle diff --git a/airbyte-integrations/connectors/destination-databend/destination_databend/destination.py b/airbyte-integrations/connectors/destination-databend/destination_databend/destination.py new file mode 100644 index 000000000000..2629ff54983a --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/destination_databend/destination.py @@ -0,0 +1,89 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import json +from datetime import datetime +from logging import getLogger +from typing import Any, Iterable, Mapping +from uuid import uuid4 + +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.destinations import Destination +from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type +from destination_databend.client import DatabendClient + +from .writer import create_databend_wirter + +logger = getLogger("airbyte") + + +class DestinationDatabend(Destination): + def write( + self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] + ) -> Iterable[AirbyteMessage]: + + """ + TODO + Reads the input stream of messages, config, and catalog to write data to the destination. + + This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received + in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been + successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing, + then the source is given the last state message output from this method as the starting point of the next sync. + + :param config: dict of JSON configuration matching the configuration declared in spec.json + :param configured_catalog: The Configured Catalog describing the schema of the data being received and how it should be persisted in the + destination + :param input_messages: The stream of input messages received from the source + :return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs + """ + streams = {s.stream.name for s in configured_catalog.streams} + client = DatabendClient(**config) + + writer = create_databend_wirter(client, logger) + + for configured_stream in configured_catalog.streams: + if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: + writer.delete_table(configured_stream.stream.name) + logger.info(f"Stream {configured_stream.stream.name} is wiped.") + writer.create_raw_table(configured_stream.stream.name) + + for message in input_messages: + if message.type == Type.STATE: + yield message + elif message.type == Type.RECORD: + data = message.record.data + stream = message.record.stream + # Skip unselected streams + if stream not in streams: + logger.debug(f"Stream {stream} was not present in configured streams, skipping") + continue + writer.queue_write_data(stream, str(uuid4()), datetime.now(), json.dumps(data)) + + # Flush any leftover messages + writer.flush() + + def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + """ + Tests if the input configuration can be used to successfully connect to the destination with the needed permissions + e.g: if a provided API token or password can be used to connect and write to the destination. + + :param logger: Logging object to display debug/info/error to the logs + (logs will not be accessible via airbyte UI if they are not passed to this logger) + :param config: Json object containing the configuration of this destination, content of this json is as specified in + the properties of the spec.json file + + :return: AirbyteConnectionStatus indicating a Success or Failure + """ + try: + client = DatabendClient(**config) + cursor = client.open() + cursor.execute("DROP TABLE IF EXISTS test") + cursor.execute("CREATE TABLE if not exists test (x Int32,y VARCHAR)") + cursor.execute("INSERT INTO test (x,y) VALUES (%,%)", [1, "yy", 2, "xx"]) + cursor.execute("DROP TABLE IF EXISTS test") + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + except Exception as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}") diff --git a/airbyte-integrations/connectors/destination-databend/destination_databend/spec.json b/airbyte-integrations/connectors/destination-databend/destination_databend/spec.json new file mode 100644 index 000000000000..9605df680ada --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/destination_databend/spec.json @@ -0,0 +1,80 @@ +{ + "documentationUrl" : "https://docs.airbyte.com/integrations/destinations/databend", + "supported_destination_sync_modes" : [ + "overwrite", + "append" + ], + "supportsIncremental" : true, + "supportsDBT" : false, + "supportsNormalization" : false, + "connectionSpecification" : { + "$schema" : "http://json-schema.org/draft-07/schema#", + "title" : "Destination Databend", + "type" : "object", + "required" : [ + "host", + "username", + "database" + ], + "additionalProperties" : true, + "properties" : { + "host" : { + "title" : "Host", + "description" : "Hostname of the database.", + "type" : "string", + "order" : 0 + }, + "protocol" : { + "title" : "Protocol", + "description" : "Protocol of the host.", + "type" : "string", + "examples" : [ + "https" + ], + "default" : "https", + "order" : 1 + }, + "port" : { + "title" : "Port", + "description" : "Port of the database.", + "type" : "integer", + "minimum" : 0, + "maximum" : 65536, + "default" : 443, + "examples" : [ + "443" + ], + "order" : 2 + }, + "database" : { + "title" : "DB Name", + "description" : "Name of the database.", + "type" : "string", + "order" : 3 + }, + "table" : { + "title" : "Default Table", + "description" : "The default table was written to.", + "type" : "string", + "examples" : [ + "default" + ], + "default" : "default", + "order" : 4 + }, + "username" : { + "title" : "User", + "description" : "Username to use to access the database.", + "type" : "string", + "order" : 5 + }, + "password" : { + "title" : "Password", + "description" : "Password associated with the username.", + "type" : "string", + "airbyte_secret" : true, + "order" : 6 + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-databend/destination_databend/writer.py b/airbyte-integrations/connectors/destination-databend/destination_databend/writer.py new file mode 100644 index 000000000000..a9c4dfe57da1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/destination_databend/writer.py @@ -0,0 +1,134 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from collections import defaultdict +from datetime import datetime +from itertools import chain + +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.models import AirbyteConnectionStatus, Status +from destination_databend.client import DatabendClient + + +class DatabendWriter: + """ + Base class for shared writer logic. + """ + + flush_interval = 1000 + + def __init__(self, client: DatabendClient) -> None: + """ + :param client: Databend SDK connection class with established connection + to the databse. + """ + try: + # open a cursor and do some work with it + self.client = client + self.cursor = client.open() + self._buffer = defaultdict(list) + self._values = 0 + except Exception as e: + # handle the exception + raise AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}") + finally: + # close the cursor + self.cursor.close() + + def delete_table(self, name: str) -> None: + """ + Delete the resulting table. + Primarily used in Overwrite strategy to clean up previous data. + + :param name: table name to delete. + """ + self.cursor.execute(f"DROP TABLE IF EXISTS _airbyte_raw_{name}") + + def create_raw_table(self, name: str): + """ + Create the resulting _airbyte_raw table. + + :param name: table name to create. + """ + query = f""" + CREATE TABLE IF NOT EXISTS _airbyte_raw_{name} ( + _airbyte_ab_id TEXT, + _airbyte_emitted_at TIMESTAMP, + _airbyte_data TEXT + ) + """ + cursor = self.cursor + cursor.execute(query) + + def queue_write_data(self, stream_name: str, id: str, time: datetime, record: str) -> None: + """ + Queue up data in a buffer in memory before writing to the database. + When flush_interval is reached data is persisted. + + :param stream_name: name of the stream for which the data corresponds. + :param id: unique identifier of this data row. + :param time: time of writing. + :param record: string representation of the json data payload. + """ + self._buffer[stream_name].append((id, time, record)) + self._values += 1 + if self._values == self.flush_interval: + self._flush() + + def _flush(self): + """ + Stub for the intermediate data flush that's triggered during the + buffering operation. + """ + raise NotImplementedError() + + def flush(self): + """ + Stub for the data flush at the end of writing operation. + """ + raise NotImplementedError() + + +class DatabendSQLWriter(DatabendWriter): + """ + Data writer using the SQL writing strategy. Data is buffered in memory + and flushed using INSERT INTO SQL statement. + """ + + flush_interval = 1000 + + def __init__(self, client: DatabendClient) -> None: + """ + :param client: Databend SDK connection class with established connection + to the databse. + """ + super().__init__(client) + + def _flush(self) -> None: + """ + Intermediate data flush that's triggered during the + buffering operation. Writes data stored in memory via SQL commands. + databend connector insert into table using stage + """ + cursor = self.cursor + # id, written_at, data + for table, data in self._buffer.items(): + cursor.execute( + f"INSERT INTO _airbyte_raw_{table} (_airbyte_ab_id,_airbyte_emitted_at,_airbyte_data) VALUES (%, %, %)", + list(chain.from_iterable(data)), + ) + self._buffer.clear() + self._values = 0 + + def flush(self) -> None: + """ + Final data flush after all data has been written to memory. + """ + self._flush() + + +def create_databend_wirter(client: DatabendClient, logger: AirbyteLogger) -> DatabendWriter: + logger.info("Using the SQL writing strategy") + writer = DatabendSQLWriter(client) + return writer diff --git a/airbyte-integrations/connectors/destination-databend/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-databend/integration_tests/integration_test.py new file mode 100644 index 000000000000..913a0909366a --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/integration_tests/integration_test.py @@ -0,0 +1,159 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json +import logging +from typing import Any, Dict, List, Mapping + +import pytest +from airbyte_cdk.models import ( + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStateMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Status, + SyncMode, + Type, +) +from destination_databend import DestinationDatabend +from destination_databend.client import DatabendClient + + +@pytest.fixture(name="config") +def config_fixture() -> Mapping[str, Any]: + with open("secrets/config.json", "r") as f: + return json.loads(f.read()) + + +@pytest.fixture(name="configured_catalog") +def configured_catalog_fixture() -> ConfiguredAirbyteCatalog: + stream_schema = {"type": "object", "properties": {"string_col": {"type": "str"}, "int_col": {"type": "integer"}}} + + append_stream = ConfiguredAirbyteStream( + stream=AirbyteStream(name="append_stream", json_schema=stream_schema, supported_sync_modes=[SyncMode.incremental]), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ) + + overwrite_stream = ConfiguredAirbyteStream( + stream=AirbyteStream(name="overwrite_stream", json_schema=stream_schema, supported_sync_modes=[SyncMode.incremental]), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.overwrite, + ) + + return ConfiguredAirbyteCatalog(streams=[append_stream, overwrite_stream]) + + +@pytest.fixture(autouse=True) +def teardown(config: Mapping): + yield + client = DatabendClient(**config) + cursor = client.open() + cursor.close() + + +@pytest.fixture(name="client") +def client_fixture(config) -> DatabendClient: + return DatabendClient(**config) + + +def test_check_valid_config(config: Mapping): + outcome = DestinationDatabend().check(logging.getLogger('airbyte'), config) + assert outcome.status == Status.SUCCEEDED + + +def test_check_invalid_config(): + outcome = DestinationDatabend().check(logging.getLogger('airbyte'), {"bucket_id": "not_a_real_id"}) + assert outcome.status == Status.FAILED + + +def _state(data: Dict[str, Any]) -> AirbyteMessage: + return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=data)) + + +def _record(stream: str, str_value: str, int_value: int) -> AirbyteMessage: + return AirbyteMessage( + type=Type.RECORD, record=AirbyteRecordMessage(stream=stream, data={"str_col": str_value, "int_col": int_value}, emitted_at=0) + ) + + +def retrieve_records(stream_name: str, client: DatabendClient) -> List[AirbyteRecordMessage]: + cursor = client.open() + cursor.execute(f"select * from _airbyte_raw_{stream_name}") + all_records = cursor.fetchall() + out = [] + for record in all_records: + # key = record[0] + # stream = key.split("__ab__")[0] + value = json.loads(record[2]) + out.append(_record(stream_name, value["str_col"], value["int_col"])) + return out + + +def retrieve_all_records(client: DatabendClient) -> List[AirbyteRecordMessage]: + """retrieves and formats all records in databend as Airbyte messages""" + overwrite_stream = "overwrite_stream" + append_stream = "append_stream" + overwrite_out = retrieve_records(overwrite_stream, client) + append_out = retrieve_records(append_stream, client) + return overwrite_out + append_out + + +def test_write(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog, client: DatabendClient): + """ + This test verifies that: + 1. writing a stream in "overwrite" mode overwrites any existing data for that stream + 2. writing a stream in "append" mode appends new records without deleting the old ones + 3. The correct state message is output by the connector at the end of the sync + """ + append_stream, overwrite_stream = configured_catalog.streams[0].stream.name, configured_catalog.streams[1].stream.name + first_state_message = _state({"state": "1"}) + first_record_chunk = [_record(append_stream, str(i), i) for i in range(5)] + [_record(overwrite_stream, str(i), i) for i in range(5)] + + second_state_message = _state({"state": "2"}) + second_record_chunk = [_record(append_stream, str(i), i) for i in range(5, 10)] + [ + _record(overwrite_stream, str(i), i) for i in range(5, 10) + ] + + destination = DestinationDatabend() + + expected_states = [first_state_message, second_state_message] + output_states = list( + destination.write( + config, configured_catalog, [*first_record_chunk, first_state_message, *second_record_chunk, second_state_message] + ) + ) + assert expected_states == output_states, "Checkpoint state messages were expected from the destination" + + expected_records = [_record(append_stream, str(i), i) for i in range(10)] + [_record(overwrite_stream, str(i), i) for i in range(10)] + records_in_destination = retrieve_all_records(client) + assert len(expected_records) == len(records_in_destination), "Records in destination should match records expected" + + # After this sync we expect the append stream to have 15 messages and the overwrite stream to have 5 + third_state_message = _state({"state": "3"}) + third_record_chunk = [_record(append_stream, str(i), i) for i in range(10, 15)] + [ + _record(overwrite_stream, str(i), i) for i in range(10, 15) + ] + + output_states = list(destination.write(config, configured_catalog, [*third_record_chunk, third_state_message])) + assert [third_state_message] == output_states + + records_in_destination = retrieve_all_records(client) + expected_records = [_record(append_stream, str(i), i) for i in range(15)] + [ + _record(overwrite_stream, str(i), i) for i in range(10, 15) + ] + assert len(expected_records) == len(records_in_destination) + + tear_down(client) + + +def tear_down(client: DatabendClient): + overwrite_stream = "overwrite_stream" + append_stream = "append_stream" + cursor = client.open() + cursor.execute(f"DROP table _airbyte_raw_{overwrite_stream}") + cursor.execute(f"DROP table _airbyte_raw_{append_stream}") diff --git a/airbyte-integrations/connectors/destination-databend/integration_tests/sample_config.json b/airbyte-integrations/connectors/destination-databend/integration_tests/sample_config.json new file mode 100644 index 000000000000..cc8ac8584d94 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/integration_tests/sample_config.json @@ -0,0 +1,9 @@ +{ + "protocol" : "https", + "host" : "tnc7yee14--xxxx.ch.datafusecloud.com", + "port" : 443, + "username" : "username", + "password" : "password", + "database" : "default", + "table" : "default" +} diff --git a/airbyte-integrations/connectors/destination-databend/main.py b/airbyte-integrations/connectors/destination-databend/main.py new file mode 100644 index 000000000000..17cced87eeb9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/main.py @@ -0,0 +1,11 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import sys + +from destination_databend import DestinationDatabend + +if __name__ == "__main__": + DestinationDatabend().run(sys.argv[1:]) diff --git a/airbyte-integrations/connectors/destination-databend/requirements.txt b/airbyte-integrations/connectors/destination-databend/requirements.txt new file mode 100644 index 000000000000..d6e1198b1ab1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/requirements.txt @@ -0,0 +1 @@ +-e . diff --git a/airbyte-integrations/connectors/destination-databend/setup.py b/airbyte-integrations/connectors/destination-databend/setup.py new file mode 100644 index 000000000000..7aa721c49fdd --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/setup.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = ["airbyte-cdk", "requests", "databend-sqlalchemy"] + +TEST_REQUIREMENTS = ["pytest~=6.1"] +setup( + name="destination_databend", + description="Destination implementation for Databend.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/destination-databend/unit_tests/test_databend_destination.py b/airbyte-integrations/connectors/destination-databend/unit_tests/test_databend_destination.py new file mode 100644 index 000000000000..eb6bbbffe616 --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/unit_tests/test_databend_destination.py @@ -0,0 +1,162 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from datetime import datetime +from typing import Dict +from unittest.mock import AsyncMock, MagicMock, call, patch + +from airbyte_cdk.models import ( + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, + Type, +) +from destination_databend.destination import DatabendClient, DestinationDatabend +from pytest import fixture + + +@fixture +def logger() -> MagicMock: + return MagicMock() + + +@fixture +def config() -> Dict[str, str]: + args = { + "database": "default", + "username": "root", + "password": "root", + "host": "localhost", + "protocol": "http", + "port": 8081, + "table": "default", + } + return args + + +@fixture(name="mock_connection") +def async_connection_cursor_mock(): + connection = MagicMock() + cursor = AsyncMock() + connection.cursor.return_value = cursor + return connection, cursor + + +@fixture +def configured_stream1() -> ConfiguredAirbyteStream: + return ConfiguredAirbyteStream( + stream=AirbyteStream( + name="table1", + json_schema={ + "type": "object", + "properties": {"col1": {"type": "string"}, "col2": {"type": "integer"}}, + }, + supported_sync_modes=[SyncMode.incremental], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ) + + +@fixture +def configured_stream2() -> ConfiguredAirbyteStream: + return ConfiguredAirbyteStream( + stream=AirbyteStream( + name="table2", + json_schema={ + "type": "object", + "properties": {"col1": {"type": "string"}, "col2": {"type": "integer"}}, + }, + supported_sync_modes=[SyncMode.incremental], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ) + + +@fixture +def airbyte_message1() -> AirbyteMessage: + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream="table1", + data={"key1": "value1", "key2": 2}, + emitted_at=int(datetime.now().timestamp()) * 1000, + ), + ) + + +@fixture +def airbyte_message2() -> AirbyteMessage: + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream="table2", + data={"key1": "value2", "key2": 3}, + emitted_at=int(datetime.now().timestamp()) * 1000, + ), + ) + + +@fixture +def airbyte_state_message() -> AirbyteMessage: + return AirbyteMessage(type=Type.STATE) + + +@patch("destination_databend.client.DatabendClient", MagicMock()) +def test_connection(config: Dict[str, str], logger: MagicMock) -> None: + # Check no log object + DatabendClient(**config) + + +@patch("destination_databend.writer.DatabendSQLWriter") +@patch("destination_databend.client.DatabendClient") +def test_sql_write_append( + mock_connection: MagicMock, + mock_writer: MagicMock, + config: Dict[str, str], + configured_stream1: ConfiguredAirbyteStream, + configured_stream2: ConfiguredAirbyteStream, + airbyte_message1: AirbyteMessage, + airbyte_message2: AirbyteMessage, + airbyte_state_message: AirbyteMessage, +) -> None: + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream1, configured_stream2]) + + destination = DestinationDatabend() + result = destination.write(config, catalog, [airbyte_message1, airbyte_state_message, airbyte_message2]) + + assert list(result) == [airbyte_state_message] + mock_writer.return_value.delete_table.assert_not_called() + mock_writer.return_value.create_raw_table.mock_calls = [call(mock_connection, "table1"), call(mock_connection, "table2")] + assert len(mock_writer.return_value.queue_write_data.mock_calls) == 2 + mock_writer.return_value.flush.assert_called_once() + + +@patch("destination_databend.writer.DatabendSQLWriter") +@patch("destination_databend.client.DatabendClient") +def test_sql_write_overwrite( + mock_connection: MagicMock, + mock_writer: MagicMock, + config: Dict[str, str], + configured_stream1: ConfiguredAirbyteStream, + configured_stream2: ConfiguredAirbyteStream, + airbyte_message1: AirbyteMessage, + airbyte_message2: AirbyteMessage, + airbyte_state_message: AirbyteMessage, +): + # Overwrite triggers a delete + configured_stream1.destination_sync_mode = DestinationSyncMode.overwrite + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream1, configured_stream2]) + + destination = DestinationDatabend() + result = destination.write(config, catalog, [airbyte_message1, airbyte_state_message, airbyte_message2]) + + assert list(result) == [airbyte_state_message] + mock_writer.return_value.delete_table.assert_called_once_with("table1") + mock_writer.return_value.create_raw_table.mock_calls = [call(mock_connection, "table1"), call(mock_connection, "table2")] diff --git a/airbyte-integrations/connectors/destination-databend/unit_tests/test_writer.py b/airbyte-integrations/connectors/destination-databend/unit_tests/test_writer.py new file mode 100644 index 000000000000..5412bd26e36b --- /dev/null +++ b/airbyte-integrations/connectors/destination-databend/unit_tests/test_writer.py @@ -0,0 +1,46 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Union +from unittest.mock import MagicMock + +from destination_databend.writer import DatabendSQLWriter +from pytest import fixture, mark + + +@fixture +def client() -> MagicMock: + return MagicMock() + + +@fixture +def sql_writer(client: MagicMock) -> DatabendSQLWriter: + return DatabendSQLWriter(client) + + +def test_sql_default(sql_writer: DatabendSQLWriter) -> None: + assert len(sql_writer._buffer) == 0 + assert sql_writer.flush_interval == 1000 + + +@mark.parametrize("writer", ["sql_writer"]) +def test_sql_create(client: MagicMock, writer: Union[DatabendSQLWriter], request: Any) -> None: + writer = request.getfixturevalue(writer) + writer.create_raw_table("dummy") + + +def test_data_buffering(sql_writer: DatabendSQLWriter) -> None: + sql_writer.queue_write_data("dummy", "id1", 20200101, '{"key": "value"}') + sql_writer._buffer["dummy"][0] == ("id1", 20200101, '{"key": "value"}') + assert len(sql_writer._buffer["dummy"]) == 1 + assert len(sql_writer._buffer.keys()) == 1 + sql_writer.queue_write_data("dummy", "id2", 20200102, '{"key2": "value2"}') + sql_writer._buffer["dummy"][0] == ("id2", 20200102, '{"key2": "value2"}') + assert len(sql_writer._buffer["dummy"]) == 2 + assert len(sql_writer._buffer.keys()) == 1 + sql_writer.queue_write_data("dummy2", "id3", 20200103, '{"key3": "value3"}') + sql_writer._buffer["dummy"][0] == ("id3", 20200103, '{"key3": "value3"}') + assert len(sql_writer._buffer["dummy"]) == 2 + assert len(sql_writer._buffer["dummy2"]) == 1 + assert len(sql_writer._buffer.keys()) == 2 diff --git a/docs/integrations/destinations/databend.md b/docs/integrations/destinations/databend.md new file mode 100644 index 000000000000..08339cf0ab32 --- /dev/null +++ b/docs/integrations/destinations/databend.md @@ -0,0 +1,54 @@ +# Databend + +This page guides you through the process of setting up the [Databend](https://databend.rs/) destination connector. + +## Features + +| Feature | Supported?\(Yes/No\) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | Yes | | +| Incremental - Append Sync | Yes | | + + +#### Output Schema + +Each stream will be output into its own table in Databend. Each table will contain 3 columns: + +* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in Databend is `VARCHAR`. +* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in Databend is `TIMESTAMP`. +* `_airbyte_data`: a json blob representing with the event data. The column type in Databend is `VARVHAR`. + +## Getting Started +You can follow the [Connecting to a Warehouse docs](https://docs.databend.com/using-databend-cloud/warehouses/connecting-a-warehouse) to get the user, password, host etc. + +Or You can create such a user by running: + +``` +GRANT CREATE ON * TO airbyte_user; +``` + +Make sure the Databend user with the following permissions: + +* can create tables and write rows. +* can create databases e.g: + +You can also use a pre-existing user but we highly recommend creating a dedicated user for Airbyte. + + +#### Target Database + +You will need to choose an existing database or create a new database that will be used to store synced data from Airbyte. + +### Setup the Databend Destination in Airbyte + +You should now have all the requirements needed to configure Databend as a destination in the UI. You'll need the following information to configure the Databend destination: + +* **Host** +* **Port** +* **Username** +* **Password** +* **Database** + + +## Changelog +######TODO: more info \ No newline at end of file