From 5164e12fc43c9856b2008932f793ecd19733d0ff Mon Sep 17 00:00:00 2001 From: Daemonxiao <35677990+Daemonxiao@users.noreply.github.com> Date: Thu, 21 Apr 2022 00:14:25 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20New=20source:=20TiDB=20(#11283)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add new source tidb * formate java code style and add item in SUMMARY.md * update doc * Update airbyte-integrations/connectors/source-tidb/src/main/resources/spec.json uptdate doc Co-authored-by: Xiang Zhang * Update airbyte-integrations/connectors/source-tidb/README.md * Update docs/integrations/sources/tidb.md Co-authored-by: Xiang Zhang * Update docs/integrations/sources/tidb.md Co-authored-by: Xiang Zhang * add seed and doc changelog * run format * regenerate seed file Co-authored-by: Xiang Zhang Co-authored-by: marcosmarxm --- .../resources/seed/source_definitions.yaml | 6 + .../src/main/resources/seed/source_specs.yaml | 160 ++++++++++++++++++ .../connectors/source-tidb/Dockerfile | 21 +++ .../connectors/source-tidb/README.md | 69 ++++++++ .../connectors/source-tidb/build.gradle | 36 ++++ .../integrations/source/tidb/TiDBSource.java | 83 +++++++++ .../source/tidb/TiDBSourceOperations.java | 155 +++++++++++++++++ .../source-tidb/src/main/resources/spec.json | 58 +++++++ .../source/tidb/TiDBSourceAcceptanceTest.java | 112 ++++++++++++ .../tidb/TiDBJdbcSourceAcceptanceTest.java | 80 +++++++++ .../source/tidb/TiDBSourceTests.java | 42 +++++ docs/SUMMARY.md | 3 +- docs/integrations/sources/tidb.md | 123 ++++++++++++++ 13 files changed, 947 insertions(+), 1 deletion(-) create mode 100755 airbyte-integrations/connectors/source-tidb/Dockerfile create mode 100755 airbyte-integrations/connectors/source-tidb/README.md create mode 100755 airbyte-integrations/connectors/source-tidb/build.gradle create mode 100644 airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSource.java create mode 100644 airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java create mode 100755 airbyte-integrations/connectors/source-tidb/src/main/resources/spec.json create mode 100755 airbyte-integrations/connectors/source-tidb/src/test-integration/java/io/airbyte/integrations/source/tidb/TiDBSourceAcceptanceTest.java create mode 100755 airbyte-integrations/connectors/source-tidb/src/test/java/io/airbyte/integrations/source/tidb/TiDBJdbcSourceAcceptanceTest.java create mode 100755 airbyte-integrations/connectors/source-tidb/src/test/java/io/airbyte/integrations/source/tidb/TiDBSourceTests.java create mode 100644 docs/integrations/sources/tidb.md diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index a25acd0a4664..050406d2a99a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -789,6 +789,12 @@ documentationUrl: https://docs.airbyte.io/integrations/sources/tempo icon: tempo.svg sourceType: api +- name: TiDB + sourceDefinitionId: 0dad1a35-ccf8-4d03-b73e-6788c00b13ae + dockerRepository: airbyte/source-tidb + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/sources/tidb + sourceType: database - name: TikTok Marketing sourceDefinitionId: 4bfac00d-ce15-44ff-95b9-9e3c3e8fbd35 dockerRepository: airbyte/source-tiktok-marketing diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 5d014f25fe95..92932c037d3e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8383,6 +8383,166 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] +- dockerImage: "airbyte/source-tidb:0.1.0" + spec: + documentationUrl: "https://docs.airbyte.io/integrations/sources/tidb" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "TiDB Source Spec" + type: "object" + required: + - "host" + - "port" + - "database" + - "username" + additionalProperties: false + properties: + host: + description: "Hostname of the database." + type: "string" + order: 0 + port: + description: "Port of the database." + title: "Port" + type: "integer" + minimum: 0 + maximum: 65536 + default: 4000 + examples: + - "4000" + order: 1 + database: + description: "Name of the database." + title: "Database" + type: "string" + order: 2 + username: + description: "Username to use to access the database." + type: "string" + order: 3 + password: + description: "Password associated with the username." + title: "Password" + type: "string" + airbyte_secret: true + order: 4 + jdbc_url_params: + description: "Additional properties to pass to the JDBC URL string when\ + \ connecting to the database formatted as 'key=value' pairs separated\ + \ by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)" + title: "JDBC URL Params" + type: "string" + order: 5 + ssl: + title: "SSL Connection" + description: "Encrypt data using SSL." + type: "boolean" + default: false + order: 6 + tunnel_method: + type: "object" + title: "SSH Tunnel Method" + description: "Whether to initiate an SSH tunnel before connecting to the\ + \ database, and if so, which kind of authentication to use." + oneOf: + - title: "No Tunnel" + required: + - "tunnel_method" + properties: + tunnel_method: + description: "No ssh tunnel needed to connect to database" + type: "string" + const: "NO_TUNNEL" + order: 0 + - title: "SSH Key Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "ssh_key" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and ssh key" + type: "string" + const: "SSH_KEY_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host." + type: "string" + order: 3 + ssh_key: + title: "SSH Private Key" + description: "OS-level user account ssh key credentials in RSA PEM\ + \ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )" + type: "string" + airbyte_secret: true + multiline: true + order: 4 + - title: "Password Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "tunnel_user_password" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and password authentication" + type: "string" + const: "SSH_PASSWORD_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host" + type: "string" + order: 3 + tunnel_user_password: + title: "Password" + description: "OS-level password for logging into the jump server host" + type: "string" + airbyte_secret: true + order: 4 + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] - dockerImage: "airbyte/source-tiktok-marketing:0.1.5" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/tiktok-marketing" diff --git a/airbyte-integrations/connectors/source-tidb/Dockerfile b/airbyte-integrations/connectors/source-tidb/Dockerfile new file mode 100755 index 000000000000..6cd5e75bcfbe --- /dev/null +++ b/airbyte-integrations/connectors/source-tidb/Dockerfile @@ -0,0 +1,21 @@ +FROM airbyte/integration-base-java:dev AS build + +WORKDIR /airbyte + +ENV APPLICATION source-tidb + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar + +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte + +ENV APPLICATION source-tidb + +COPY --from=build /airbyte /airbyte + +# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile. +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-tidb diff --git a/airbyte-integrations/connectors/source-tidb/README.md b/airbyte-integrations/connectors/source-tidb/README.md new file mode 100755 index 000000000000..04405c7f8416 --- /dev/null +++ b/airbyte-integrations/connectors/source-tidb/README.md @@ -0,0 +1,69 @@ +# Source Tidb + +This is the repository for the Tidb source connector in Java. +For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/sources/tidb). + +## Local development + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-tidb:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`. +Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information. + +**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials. + +### Locally running the connector docker image + +#### Build +Build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-tidb:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-tidb:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-tidb:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-tidb:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-tidb:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing +We use `JUnit` for Java tests. + +### Unit and Integration Tests +Place unit tests under `src/test/...` +Place integration tests in `src/test-integration/...` + +#### Acceptance Tests +Airbyte has a standard test suite that all source connectors must pass. Implement the `TODO`s in +`src/test-integration/java/io/airbyte/integrations/sources/TiDBSourceAcceptanceTest.java`. + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-tidb:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-tidb:integrationTest +``` + +## Dependency Management + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-tidb/build.gradle b/airbyte-integrations/connectors/source-tidb/build.gradle new file mode 100755 index 000000000000..efcb2bf76b0d --- /dev/null +++ b/airbyte-integrations/connectors/source-tidb/build.gradle @@ -0,0 +1,36 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.source.tidb.TiDBSource' + applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] +} + +dependencies { + implementation project(':airbyte-db:lib') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:connectors:source-jdbc') + implementation project(':airbyte-integrations:connectors:source-relational-db') + + //TODO Add jdbc driver import here. Ex: implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14' + implementation 'mysql:mysql-connector-java:8.0.22' + + // Add testcontainers and use GenericContainer for TiDB + implementation "org.testcontainers:testcontainers:1.16.3" + + testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) + + testImplementation 'org.apache.commons:commons-lang3:3.11' + + integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-tidb') + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') + + integrationTestJavaImplementation "org.testcontainers:testcontainers:1.16.3" + + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSource.java b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSource.java new file mode 100644 index 000000000000..fea9e0c41bf1 --- /dev/null +++ b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSource.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.tidb; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.mysql.cj.MysqlType; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.NoOpJdbcStreamingQueryConfiguration; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.base.ssh.SshWrappedSource; +import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import java.util.List; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TiDBSource extends AbstractJdbcSource implements Source { + + private static final Logger LOGGER = LoggerFactory.getLogger(TiDBSource.class); + + static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + public static final List SSL_PARAMETERS = List.of( + "useSSL=true", + "requireSSL=true", + "verifyServerCertificate=false"); + + public static Source sshWrappedSource() { + return new SshWrappedSource(new TiDBSource(), List.of("host"), List.of("port")); + } + + public TiDBSource() { + super(DRIVER_CLASS, new NoOpJdbcStreamingQueryConfiguration(), new TiDBSourceOperations()); + } + + @Override + public JsonNode toDatabaseConfig(final JsonNode config) { + final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s", + config.get("host").asText(), + config.get("port").asText(), + config.get("database").asText())); + + if (config.get("jdbc_url_params") != null + && !config.get("jdbc_url_params").asText().isEmpty()) { + jdbcUrl.append("&").append(config.get("jdbc_url_params").asText()); + } + + // only if config ssl and ssl == true, use ssl to connect db + if (config.has("ssl") && config.get("ssl").asBoolean()) { + jdbcUrl.append("&").append(String.join("&", SSL_PARAMETERS)); + } + + final ImmutableMap.Builder configBuilder = ImmutableMap.builder() + .put("username", config.get("username").asText()) + .put("jdbc_url", jdbcUrl.toString()); + + if (config.has("password")) { + configBuilder.put("password", config.get("password").asText()); + } + + return Jsons.jsonNode(configBuilder.build()); + } + + @Override + public Set getExcludedInternalNameSpaces() { + return Set.of( + "information_schema", + "metrics_schema", + "performance_schema", + "mysql"); + } + + public static void main(final String[] args) throws Exception { + final Source source = TiDBSource.sshWrappedSource(); + LOGGER.info("starting source: {}", TiDBSource.class); + new IntegrationRunner(source).run(args); + LOGGER.info("completed source: {}", TiDBSource.class); + } + +} diff --git a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java new file mode 100644 index 000000000000..fedaf9a3fde6 --- /dev/null +++ b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.tidb; + +import static io.airbyte.db.jdbc.JdbcConstants.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.mysql.cj.MysqlType; +import com.mysql.cj.jdbc.result.ResultSetMetaData; +import com.mysql.cj.result.Field; +import io.airbyte.db.DataTypeUtils; +import io.airbyte.db.SourceOperations; +import io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations; +import io.airbyte.protocol.models.JsonSchemaType; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TiDBSourceOperations extends AbstractJdbcCompatibleSourceOperations implements SourceOperations { + + private static final Logger LOGGER = LoggerFactory.getLogger(TiDBSourceOperations.class); + + @Override + public void setJsonField(ResultSet resultSet, int colIndex, ObjectNode json) throws SQLException { + final ResultSetMetaData metaData = (ResultSetMetaData) resultSet.getMetaData(); + final Field field = metaData.getFields()[colIndex - 1]; + final String columnName = field.getName(); + final MysqlType columnType = field.getMysqlType(); + + switch (columnType) { + case BIT -> { + if (field.getLength() == 1L) { + // BIT(1) is boolean + putBoolean(json, columnName, resultSet, colIndex); + } else { + putBinary(json, columnName, resultSet, colIndex); + } + } + case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); + case TINYINT, TINYINT_UNSIGNED -> { + if (field.getLength() == 1L) { + // TINYINT(1) is boolean + putBoolean(json, columnName, resultSet, colIndex); + } else { + putShortInt(json, columnName, resultSet, colIndex); + } + } + case SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> putInteger(json, columnName, resultSet, colIndex); + case INT, INT_UNSIGNED -> { + if (field.isUnsigned()) { + putBigInt(json, columnName, resultSet, colIndex); + } else { + putInteger(json, columnName, resultSet, colIndex); + } + } + case BIGINT, BIGINT_UNSIGNED -> putBigInt(json, columnName, resultSet, colIndex); + case FLOAT, FLOAT_UNSIGNED -> putFloat(json, columnName, resultSet, colIndex); + case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex); + case DECIMAL, DECIMAL_UNSIGNED -> putBigDecimal(json, columnName, resultSet, colIndex); + case DATE -> putDate(json, columnName, resultSet, colIndex); + case DATETIME, TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); + case TIME -> putTime(json, columnName, resultSet, colIndex); + case YEAR -> { + final String year = resultSet.getDate(colIndex).toString().split("-")[0]; + json.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> year)); + } + case CHAR, VARCHAR -> { + if (field.isBinary()) { + // when character set is binary, the returned value is binary + putBinary(json, columnName, resultSet, colIndex); + } else { + putString(json, columnName, resultSet, colIndex); + } + } + case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> putBinary(json, columnName, resultSet, colIndex); + case TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, JSON, ENUM, SET -> putString(json, columnName, resultSet, colIndex); + case NULL -> json.set(columnName, NullNode.instance); + default -> putDefault(json, columnName, resultSet, colIndex); + } + } + + @Override + public void setStatementField(PreparedStatement preparedStatement, int parameterIndex, MysqlType cursorFieldType, String value) + throws SQLException { + switch (cursorFieldType) { + case BIT -> setBit(preparedStatement, parameterIndex, value); + case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value); + case TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED -> setInteger(preparedStatement, parameterIndex, + value); + case INT, INT_UNSIGNED, BIGINT, BIGINT_UNSIGNED -> setBigInteger(preparedStatement, parameterIndex, value); + case FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED -> setDouble(preparedStatement, parameterIndex, value); + case DECIMAL, DECIMAL_UNSIGNED -> setDecimal(preparedStatement, parameterIndex, value); + case DATE -> setDate(preparedStatement, parameterIndex, value); + case DATETIME, TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value); + case TIME -> setTime(preparedStatement, parameterIndex, value); + case YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET -> setString(preparedStatement, parameterIndex, value); + case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> setBinary(preparedStatement, parameterIndex, value); + // since cursor are expected to be comparable, handle cursor typing strictly and error on + // unrecognized types + default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType)); + } + } + + @Override + public MysqlType getFieldType(JsonNode field) { + try { + final MysqlType literalType = MysqlType.getByName(field.get(INTERNAL_COLUMN_TYPE_NAME).asText()); + final int columnSize = field.get(INTERNAL_COLUMN_SIZE).asInt(); + + switch (literalType) { + // BIT(1) and TINYINT(1) are interpreted as boolean + case BIT, TINYINT, TINYINT_UNSIGNED -> { + if (columnSize == 1) { + return MysqlType.BOOLEAN; + } + } + // When CHAR[N] and VARCHAR[N] columns have binary character set, the returned + // types are BINARY[N] and VARBINARY[N], respectively. So we don't need to + // convert them here. This is verified in MySqlSourceDatatypeTest. + } + + return literalType; + } catch (final IllegalArgumentException ex) { + LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s (type name: %s). Casting to VARCHAR.", + field.get(INTERNAL_COLUMN_NAME), + field.get(INTERNAL_SCHEMA_NAME), + field.get(INTERNAL_TABLE_NAME), + field.get(INTERNAL_COLUMN_TYPE), + field.get(INTERNAL_COLUMN_TYPE_NAME))); + return MysqlType.VARCHAR; + } + } + + @Override + public JsonSchemaType getJsonType(MysqlType mysqlType) { + return switch (mysqlType) { + case + // TINYINT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link + // getFieldType} + TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, INT, INT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED, BIGINT, BIGINT_UNSIGNED, FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED -> JsonSchemaType.NUMBER; + case BOOLEAN -> JsonSchemaType.BOOLEAN; + case NULL -> JsonSchemaType.NULL; + // BIT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link getFieldType} + case BIT, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> JsonSchemaType.STRING_BASE_64; + default -> JsonSchemaType.STRING; + }; + } + +} diff --git a/airbyte-integrations/connectors/source-tidb/src/main/resources/spec.json b/airbyte-integrations/connectors/source-tidb/src/main/resources/spec.json new file mode 100755 index 000000000000..d3363fc6c437 --- /dev/null +++ b/airbyte-integrations/connectors/source-tidb/src/main/resources/spec.json @@ -0,0 +1,58 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/tidb", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "TiDB Source Spec", + "type": "object", + "required": ["host", "port", "database", "username"], + "additionalProperties": false, + "properties": { + "host": { + "description": "Hostname of the database.", + "type": "string", + "order": 0 + }, + "port": { + "description": "Port of the database.", + "title": "Port", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 4000, + "examples": ["4000"], + "order": 1 + }, + "database": { + "description": "Name of the database.", + "title": "Database", + "type": "string", + "order": 2 + }, + "username": { + "description": "Username to use to access the database.", + "type": "string", + "order": 3 + }, + "password": { + "description": "Password associated with the username.", + "title": "Password", + "type": "string", + "airbyte_secret": true, + "order": 4 + }, + "jdbc_url_params": { + "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)", + "title": "JDBC URL Params", + "type": "string", + "order": 5 + }, + "ssl": { + "title": "SSL Connection", + "description": "Encrypt data using SSL.", + "type": "boolean", + "default": false, + "order": 6 + } + } + } +} diff --git a/airbyte-integrations/connectors/source-tidb/src/test-integration/java/io/airbyte/integrations/source/tidb/TiDBSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-tidb/src/test-integration/java/io/airbyte/integrations/source/tidb/TiDBSourceAcceptanceTest.java new file mode 100755 index 000000000000..7dd89937de5d --- /dev/null +++ b/airbyte-integrations/connectors/source-tidb/src/test-integration/java/io/airbyte/integrations/source/tidb/TiDBSourceAcceptanceTest.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.tidb; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.base.ssh.SshHelpers; +import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.*; +import java.util.HashMap; +import org.jooq.SQLDialect; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class TiDBSourceAcceptanceTest extends SourceAcceptanceTest { + + private static final String STREAM_NAME = "id_and_name"; + private static final String STREAM_NAME2 = "public.starships"; + + protected GenericContainer container; + protected JsonNode config; + + @Override + protected void setupEnvironment(final TestDestinationEnv testEnv) throws Exception { + container = new GenericContainer(DockerImageName.parse("pingcap/tidb:nightly")) + .withExposedPorts(4000); + container.start(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", "127.0.0.1") + .put("port", container.getFirstMappedPort()) + .put("username", "root") + .put("database", "test") + .build()); + final Database database = Databases.createDatabase( + config.get("username").asText(), + "", + String.format("jdbc:mysql://%s:%s/%s", + config.get("host").asText(), + config.get("port").asText(), + config.get("database").asText()), + "com.mysql.cj.jdbc.Driver", + SQLDialect.MYSQL); + + database.query(ctx -> { + ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + return null; + }); + + database.close(); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + container.close(); + } + + @Override + protected String getImageName() { + return "airbyte/source-tidb:dev"; + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return SshHelpers.getSpecAndInjectSsh(); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + String.format("%s.%s", config.get("database").asText(), STREAM_NAME), + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + String.format("%s.%s", config.get("database").asText(), STREAM_NAME2), + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); + + } + + @Override + protected JsonNode getState() { + return Jsons.jsonNode(new HashMap<>()); + } + +} diff --git a/airbyte-integrations/connectors/source-tidb/src/test/java/io/airbyte/integrations/source/tidb/TiDBJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-tidb/src/test/java/io/airbyte/integrations/source/tidb/TiDBJdbcSourceAcceptanceTest.java new file mode 100755 index 000000000000..00c0090e4714 --- /dev/null +++ b/airbyte-integrations/connectors/source-tidb/src/test/java/io/airbyte/integrations/source/tidb/TiDBJdbcSourceAcceptanceTest.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.tidb; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.mysql.cj.MysqlType; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +import org.junit.jupiter.api.*; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +class TiDBJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { + + protected static GenericContainer container; + protected static String USER = "root"; + protected static String DATABASE = "test"; + + protected Database database; + + @BeforeEach + public void setup() throws Exception { + container = new GenericContainer(DockerImageName.parse("pingcap/tidb:nightly")) + .withExposedPorts(4000); + container.start(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", "127.0.0.1") + .put("port", container.getFirstMappedPort()) + .put("username", USER) + .put("database", DATABASE) + // .put("ssl", true) + .build()); + + super.setup(); + } + + @AfterEach + void tearDownTiDB() throws Exception { + container.close(); + container.stop(); + super.tearDown(); + } + + @Override + public AbstractJdbcSource getSource() { + return new TiDBSource(); + } + + @Override + public boolean supportsSchemas() { + return false; + } + + @Override + public JsonNode getConfig() { + return Jsons.clone(config); + } + + @Override + public String getDriverClass() { + return TiDBSource.DRIVER_CLASS; + } + + @Override + public AbstractJdbcSource getJdbcSource() { + return new TiDBSource(); + } + + @AfterAll + static void cleanUp() { + container.close(); + } + +} diff --git a/airbyte-integrations/connectors/source-tidb/src/test/java/io/airbyte/integrations/source/tidb/TiDBSourceTests.java b/airbyte-integrations/connectors/source-tidb/src/test/java/io/airbyte/integrations/source/tidb/TiDBSourceTests.java new file mode 100755 index 000000000000..6a0137088fab --- /dev/null +++ b/airbyte-integrations/connectors/source-tidb/src/test/java/io/airbyte/integrations/source/tidb/TiDBSourceTests.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.tidb; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class TiDBSourceTests { + + private JsonNode config; + private GenericContainer container; + + @Test + public void testSettingTimezones() throws Exception { + container = new GenericContainer(DockerImageName.parse("pingcap/tidb:nightly")) + .withExposedPorts(4000); + + container.start(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", "127.0.0.1") + .put("port", container.getFirstMappedPort()) + .put("username", "root") + .put("database", "test") + .build()); + + AirbyteConnectionStatus check = new TiDBSource().check(config); + + assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, check.getStatus()); + container.close(); + } + +} diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 8632b81013bf..6800470c95f7 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -166,6 +166,7 @@ - [TikTok Marketing](integrations/sources/tiktok-marketing.md) - [Trello](integrations/sources/trello.md) - [Twilio](integrations/sources/twilio.md) + - [TiDB](integrations/sources/tidb.md) - [Typeform](integrations/sources/typeform.md) - [US Census API](integrations/sources/us-census.md) - [VictorOps (Sponsored by Faros AI)](integrations/sources/victorops.md) @@ -300,4 +301,4 @@ - [On Deploying](troubleshooting/on-deploying.md) - [On Setting up a New Connection](troubleshooting/new-connection.md) - [On Running a Sync](troubleshooting/running-sync.md) - - [On Upgrading](troubleshooting/on-upgrading.md) + - [On Upgrading](troubleshooting/on-upgrading.md) \ No newline at end of file diff --git a/docs/integrations/sources/tidb.md b/docs/integrations/sources/tidb.md new file mode 100644 index 000000000000..93e6864d77be --- /dev/null +++ b/docs/integrations/sources/tidb.md @@ -0,0 +1,123 @@ +# TiDB + +## Overview + +[TiDB](https://github.com/pingcap/tidb) (/’taɪdiːbi:/, "Ti" stands for Titanium) is an open-source, distributed, NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads. It is MySQL compatible and features horizontal scalability, strong consistency, and high availability. TiDB can be deployed on-premise or in-cloud. + +The TiDB source supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. + +### Resulting schema + +The TiDB source does not alter the schema present in your database. Depending on the destination connected to this source, however, the schema may be altered. See the destination's documentation for more details. + +### Features + +| Feature | Supported | Notes | +| :---------------------------- | :-------- | :---- | +| Full Refresh Sync | Yes | | +| Incremental - Append Sync | Yes | | +| Replicate Incremental Deletes | Yes | | +| Change Data Capture | No | | +| SSL Support | Yes | | +| SSH Tunnel Connection | Yes | | + +## Getting started + +### Requirements + +1. TiDB `v4.0` or above +2. Allow connections from Airbyte to your TiDB database \(if they exist in separate VPCs\) +3. (Optional) Create a dedicated read-only Airbyte user with access to all tables needed for replication + +### Setup guide + +#### 1. Make sure your database is accessible from the machine running Airbyte + +This is dependent on your networking setup. The easiest way to verify if Airbyte is able to connect to your TiDB instance is via the check connection tool in the UI. + +#### 2. Create a dedicated read-only user with access to the relevant tables \(Recommended but optional\) + +This step is optional but highly recommended to allow for better permission control and auditing. Alternatively, you can use Airbyte with an existing user in your database. + +To create a dedicated database user, run the following commands against your database: + +```sql +CREATE USER 'airbyte'@'%' IDENTIFIED BY 'your_password_here'; +``` + +Then give it access to the relevant database: + +```sql +GRANT SELECT ON .* TO 'airbyte'@'%'; +``` + +#### 3. That's it! + +Your database user should now be ready for use with Airbyte. + +## Connection via SSH Tunnel + +Airbyte has the ability to connect to a TiDB instance via an SSH Tunnel. The reason you might want to do this because it is not possible \(or against security policy\) to connect to the database directly \(e.g. it does not have a public IP address\). + +When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server \(a.k.a. a bastion sever\) that _does_ have direct access to the database. Airbyte connects to the bastion and then asks the bastion to connect directly to the server. + +Using this feature requires additional configuration, when creating the source. We will talk through what each piece of configuration means. + +1. Configure all fields for the source as you normally would, except `SSH Tunnel Method`. +2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. + 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for establishing the SSH Tunnel \(see below for more information on generating this key\). + 2. Choose `Password Authentication` if you will be using a password as your secret for establishing the SSH Tunnel. +3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will connect to. This should be a hostname or an IP Address. +4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is `22`, so unless you have explicitly changed something, go with the default. +5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion server. This is NOT the TiDB username. +6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the password of the User from the previous step. If you are using `SSH Key Authentication` TiDB password, but the password for the OS-user that Airbyte is using to perform commands on the bastion. +7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending with `-----END RSA PRIVATE KEY-----`. + +### Data type mapping + +[TiDB data types](https://docs.pingcap.com/tidb/stable/data-type-overview) are mapped to the following data types when synchronizing data: + +| TiDB Type | Resulting Type | Notes | +| :---------------------------------------- |:-----------------------| :----------------------------------------------------------- | +| `bit(1)` | boolean | | +| `bit(>1)` | base64 binary string | | +| `boolean` | boolean | | +| `tinyint(1)` | boolean | | +| `tinyint` | number | | +| `smallint` | number | | +| `mediumint` | number | | +| `int` | number | | +| `bigint` | number | | +| `float` | number | | +| `double` | number | | +| `decimal` | number | | +| `binary` | base64 binary string | | +| `blob` | base64 binary string | | +| `date` | string | ISO 8601 date string. ZERO-DATE value will be converted to NULL. If column is mandatory, convert to EPOCH. | +| `datetime`, `timestamp` | string | ISO 8601 datetime string. ZERO-DATE value will be converted to NULL. If column is mandatory, convert to EPOCH. | +| `time` | string | ISO 8601 time string. Values are in range between 00:00:00 and 23:59:59. | +| `year` | year string | [Doc](https://docs.pingcap.com/tidb/stable/data-type-date-and-time#year-type) | +| `char`, `varchar` with non-binary charset | string | | +| `char`, `varchar` with binary charset | base64 binary string | | +| `tinyblob` | base64 binary string | | +| `blob` | base64 binary string | | +| `mediumblob` | base64 binary string | | +| `longblob` | base64 binary string | | +| `binary` | base64 binary string | | +| `varbinary` | base64 binary string | | +| `tinytext` | string | | +| `text` | string | | +| `mediumtext` | string | | +| `longtext` | string | | +| `json` | serialized json string | E.g. `{"a": 10, "b": 15}` | +| `enum` | string | | +| `set` | string | E.g. `blue,green,yellow` | + + +**Note:** arrays for all the above types as well as custom types are supported, although they may be de-nested depending on the destination. + +## Changelog + +| Version | Date | Pull Request | Subject | +| :------ | :--- | :----------- | ------- | +| 0.1.0 | 2022-04-19 | [11283](https://github.com/airbytehq/airbyte/pull/11283) | Initial Release |